Skip to content
This repository has been archived by the owner on Oct 15, 2022. It is now read-only.

Event streaming #183

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
45cf22e
Adding 'Reason' data for Started events
nyarly Oct 15, 2019
50e7812
Message passing skeleton for streaming events to clients
nyarly Oct 15, 2019
9fcb3b3
Removing listener channels as they expire
nyarly Oct 16, 2019
59ef6bf
Added stream_events_ command
nyarly Oct 16, 2019
63641a5
Live events and example notify script
nyarly Oct 21, 2019
3621c19
Snapshot output and example prompt script
nyarly Oct 21, 2019
98b8d54
Readable serialization for log lines
nyarly Nov 7, 2019
f3f6936
Resolving rebase conflicts
nyarly Nov 8, 2019
9acce20
Finishing
nyarly Nov 8, 2019
dc603c9
Nicer error when daemon not available
nyarly Nov 8, 2019
7bee0f0
Rebase cleanup
nyarly Nov 14, 2019
f260176
Moving from Heartbeat to infinite client timeout
nyarly Nov 28, 2019
5f7d4fe
Merge remote-tracking branch 'upstream/master' into stream_events
nyarly Nov 28, 2019
109ec1c
Cleaning up
nyarly Nov 28, 2019
a227d9a
fmt
nyarly Nov 28, 2019
f859e04
Merge remote-tracking branch 'upstream/master' into stream_events
nyarly Jan 11, 2020
ccd1f9d
One test failing
nyarly Jan 22, 2020
4a354cd
debugging output
nyarly Jan 23, 2020
4c55de3
Tying channels together appropriately
nyarly Jan 25, 2020
943485d
Merge remote-tracking branch 'upstream/master' into stream_events
nyarly Jan 25, 2020
bd15b50
Debugging output, extending worker threads
nyarly Jan 26, 2020
35e15be
Cleaning up
nyarly Jan 27, 2020
7e75861
Deadline timer in client
nyarly Jan 28, 2020
6b57420
Fix for thread exhaustion
nyarly Jan 28, 2020
fbb15ab
Merge remote-tracking branch 'upstream/master' into stream_events
nyarly Jan 31, 2020
f1ebd02
Transition to using BuildError
nyarly Jan 31, 2020
767a016
Downcasing NixFile JSON
nyarly Jan 31, 2020
5bcfcf1
Merge branch 'master' into stream_events
nyarly Jan 31, 2020
e49bf56
Simplifying log line handling
nyarly Jan 31, 2020
b116a15
Removing example scripts for time being
nyarly Jan 31, 2020
4bc7006
Review response
nyarly Jan 31, 2020
98f136e
Merge remote-tracking branch 'upstream/master' into stream_events
nyarly Jan 31, 2020
e464081
Post merge cleanup
nyarly Jan 31, 2020
5f4bac2
Format and linting
nyarly Jan 31, 2020
398b9ed
Nicer names for varlink kinds
nyarly Jan 31, 2020
5563ab2
Improving comments on varlink interface
nyarly Jan 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions example/notify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#! /usr/bin/env nix-shell
nyarly marked this conversation as resolved.
Show resolved Hide resolved
#! nix-shell -i bash -p jq findutils libnotify

lorri stream_events_ --kind live |\
jq --unbuffered \
'((.completed?|values|"Build complete in \(.nix_file)"),
(.failure? |values|"Build failed in \(.nix_file)"))' |\
tee /dev/stderr |\
xargs -n 1 notify-send "Lorri Build"
29 changes: 29 additions & 0 deletions example/prompt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#! /usr/bin/env -S shellHook= nix-shell
#! nix-shell -i bash -p jq findutils libnotify

# As part of PS1 command, likely you don't want this output

shell_nix=${1:-${NIX_SHELL_PATH:?"Usage: $0 <path to shell.nix>"}}
shell_nix=$(realpath "$shell_nix")
complete_glyph=✔
building_glyph=⏳
failed_glyph=✘
unknown_glyph="¿"

glyph=$(lorri stream_events_ -k snapshot | jq -r \
"(if .completed?.nix_file == \"$shell_nix\" then \"${complete_glyph}\" else null end),
(if .failure?.nix_file == \"$shell_nix\" then \"${failed_glyph}\" else null end),
(if .started?.nix_file == \"$shell_nix\" then \"${building_glyph}\" else null end) | values")

if [ -z "$glyph" ]; then
echo -n $unknown_glyph
exit 1
fi

echo -n "$glyph"

if [ "$glyph" = "$failed_glyph" ]; then
exit 1
fi

exit 0
181 changes: 162 additions & 19 deletions src/build_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,156 @@
//! evaluate and build a given Nix file.

use crate::builder;
use crate::daemon::LoopHandlerEvent;
use crate::error::BuildError;
use crate::pathreduction::reduce_paths;
use crate::project::roots;
use crate::project::roots::Roots;
use crate::project::Project;
use crate::watch::{DebugMessage, EventError, Reason, Watch};
use crate::NixFile;
use crossbeam_channel as chan;
use serde::{
de::{self, Visitor},
Deserialize, Deserializer, Serialize, Serializer,
};
use slog_scope::{debug, warn};
use std::path::PathBuf;

/// Builder events sent back over `BuildLoop.tx`.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Event {
/// The build has started
Started(Reason),
/// The build completed successfully
Completed(BuildResults),
/// The build command returned a failing exit status
Failure(BuildError),
/// Demarks a stream of events from recent history becoming live
SectionEnd,
/// A build has started
Started {
/// The shell.nix file for the building project
nix_file: NixFile,
/// The reason the build started
reason: Reason,
},
/// A build completed successfully
Completed {
/// The shell.nix file for the building project
nix_file: NixFile,
/// The result of the build
result: BuildResults,
},
/// A build command returned a failing exit status
Failure {
/// The shell.nix file for the building project
nix_file: NixFile,
/// The error that exited the build
failure: BuildError,
},
}

/// Results of a single, successful build.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BuildResults {
/// See `build::Info.outputPaths
pub output_paths: builder::OutputPaths<roots::RootPath>,
}

/// Results of a single, failing build.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuildExitFailure {
/// stderr log output
pub log_lines: Vec<LogLine>,
}

/// A line from stderr log output
#[derive(Debug, Clone)]
pub struct LogLine(std::ffi::OsString);

impl Serialize for LogLine {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let LogLine(oss) = self;
serializer.serialize_str(&*oss.to_string_lossy())
}
}

impl<'de> Deserialize<'de> for LogLine {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
use std::fmt;

struct LLVisitor;

impl<'de> Visitor<'de> for LLVisitor {
type Value = LogLine;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string")
}

fn visit_str<E>(self, value: &str) -> Result<LogLine, E>
where
E: de::Error,
{
Ok(LogLine(std::ffi::OsString::from(value)))
}
}

deserializer.deserialize_str(LLVisitor)
}
}

impl From<std::ffi::OsString> for LogLine {
fn from(oss: std::ffi::OsString) -> Self {
LogLine(oss)
}
}

impl From<LogLine> for std::ffi::OsString {
fn from(ll: LogLine) -> Self {
ll.0
}
}
nyarly marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(test)]
mod tests {
use super::Event;
use crate::error::BuildError;
use crate::NixFile;
use serde_json;

fn build_failure() -> Event {
Event::Failure {
nix_file: NixFile::Shell(std::path::PathBuf::from("/somewhere/shell.nix")),
failure: BuildError::Exit {
cmd: "ebs".to_string(),
status: Some(1),
logs: vec![
std::ffi::OsString::from("this is a test of the emergency broadcast system"),
std::ffi::OsString::from("you will hear a tone"),
std::ffi::OsString::from("remember, this is only a test"),
],
},
}
}

#[test]
fn logline_json_readable() -> Result<(), serde_json::Error> {
// just don't explode, you know?
assert!(serde_json::to_string(&build_failure())?.contains("emergency"));
Ok(())
}

#[test]
fn logline_json_roundtrip() -> Result<(), serde_json::Error> {
// just don't explode, you know?
serde_json::from_str::<serde_json::Value>(&serde_json::to_string(&build_failure())?)
.map(|_| ())
}
}

/// The BuildLoop repeatedly builds the Nix expression in
/// `project` each time a source file influencing
/// a previous build changes.
Expand Down Expand Up @@ -57,8 +179,11 @@ impl<'a> BuildLoop<'a> {
/// When new filesystem changes are detected while a build is
/// still running, it is finished first before starting a new build.
#[allow(clippy::drop_copy, clippy::zero_ptr)] // triggered by `select!`
pub fn forever(&mut self, tx: chan::Sender<Event>, rx_ping: chan::Receiver<()>) {
let send = |msg| tx.send(msg).expect("Failed to send an event");
pub fn forever(&mut self, tx: chan::Sender<LoopHandlerEvent>, rx_ping: chan::Receiver<()>) {
let send = |msg| {
debug!("BuildLoop sending"; "message" => format!("{:#?}", msg));
nyarly marked this conversation as resolved.
Show resolved Hide resolved
tx.send(msg).expect("Failed to send an event")
};
let translate_reason = |rsn| match rsn {
Ok(rsn) => rsn,
// we should continue and just cite an unknown reason
Expand All @@ -76,9 +201,10 @@ impl<'a> BuildLoop<'a> {
};

// The project has just been added, so run the builder in the first iteration
let mut reason = Some(Event::Started(Reason::ProjectAdded(
self.project.nix_file.clone(),
)));
let mut reason = Some(Event::Started {
nix_file: self.project.nix_file.clone(),
reason: Reason::ProjectAdded(self.project.nix_file.clone()),
});
let mut output_paths = None;

// Drain pings initially: we're going to trigger a first build anyway
Expand All @@ -89,15 +215,27 @@ impl<'a> BuildLoop<'a> {
loop {
// If there is some reason to build, run the build!
if let Some(rsn) = reason {
send(rsn);
send(rsn.into());
match self.once() {
Ok(result) => {
output_paths = Some(result.output_paths.clone());
send(Event::Completed(result));
send(
Event::Completed {
nix_file: self.project.nix_file.clone(),
result,
}
.into(),
);
}
Err(e) if e.is_actionable() => send(Event::Failure(e)),
Err(e) if e.is_actionable() => send(
Event::Failure {
nix_file: self.project.nix_file.clone(),
failure: e,
}
.into(),
),
Err(e) => {
panic!("Unrecoverable error:\n{}", e);
panic!("Unrecoverable error:\n{:#?}", e);
}
}
reason = None;
Expand All @@ -106,12 +244,17 @@ impl<'a> BuildLoop<'a> {
chan::select! {
recv(rx_notify) -> msg => if let Ok(msg) = msg {
if let Some(rsn) = self.watch.process(msg) {
reason = Some(Event::Started(translate_reason(rsn)));
reason = Some(Event::Started{
nix_file: self.project.nix_file.clone(),
reason: translate_reason(rsn)
});
}
},
recv(rx_ping) -> msg => if let (Ok(()), Some(output_paths)) = (msg, &output_paths) {
if !output_paths.shell_gc_root_is_dir() {
reason = Some(Event::Started(Reason::PingReceived));
reason = Some(Event::Started{
nix_file: self.project.nix_file.clone(),
reason: Reason::PingReceived});
}
},
}
Expand Down
2 changes: 1 addition & 1 deletion src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ where
}

/// Output paths generated by `logged-evaluation.nix`
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutputPaths<T> {
/// Shell path modified to work as a gc root
pub shell_gc_root: T,
Expand Down
13 changes: 13 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Defines the CLI interface using structopt.

use crate::ops::stream_events::EventKind;
use std::path::PathBuf;

#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -49,6 +50,10 @@ pub enum Command {
#[structopt(name = "ping_")]
Ping_(Ping_),

/// (plumbing) Ask the lorri daemon to report build events as they occur
#[structopt(name = "stream_events_")]
StreamEvents_(StreamEvents_),

/// Upgrade Lorri
#[structopt(name = "self-upgrade", alias = "self-update")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alias "self-update" was removed in a recent PR. If I had to guess, I'd say that re-introducing it here was not intentional?

Upgrade(UpgradeTo),
Expand Down Expand Up @@ -122,6 +127,14 @@ pub struct Ping_ {
pub nix_file: PathBuf,
}

/// Stream events from the daemon.
#[derive(StructOpt, Debug)]
pub struct StreamEvents_ {
#[structopt(short, long, default_value = "all")]
nyarly marked this conversation as resolved.
Show resolved Hide resolved
/// The kind of events to report
pub kind: EventKind,
}

/// A stub struct to represent how what we want to upgrade to.
#[derive(StructOpt, Debug)]
#[structopt(name = "basic")]
Expand Down
32 changes: 32 additions & 0 deletions src/com.target.lorri.varlink
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,38 @@ type ShellNix (
path: string
)

# Monitor the daemon. The method will reply with an update whenever a build begins or ends.
# Montior will immediately reply with a snapshot of known projects, then a marker event,
# indicating that the stream of events is now "live."
method Monitor() -> (event: Event)

type Event (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Varlink does not provide proper enums or gRPC-like oneof. But I think that events can be modelled in a slightly more "type-safe" way. What do you think about this sort of structure:

type Event {
    kind: (section_end, started),
    section_end: SectionEnd?, # set iff kind == section_end
    started: Started?, # set iff kind == started
    # ...
}

where SectionEnd, Started, etc. have only non-optional fields?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be interesting to experiment, but I think I need to put it into a followup PR.

kind: (sectionend, started, completed, failure),
nyarly marked this conversation as resolved.
Show resolved Hide resolved
nix_file: ?ShellNix, # not in sectionend
nyarly marked this conversation as resolved.
Show resolved Hide resolved
reason: ?Reason, # only if started
result: ?Outcome, # only if completed
nyarly marked this conversation as resolved.
Show resolved Hide resolved
failure: ?Failure # only if failure
)

type Reason (
kind: (projectadded, pingreceived, fileschanged, unknown),
nyarly marked this conversation as resolved.
Show resolved Hide resolved
project: ?ShellNix,
files: ?[]string,
debug: ?string
)

type Outcome (
project_root: string
)

type Failure (
kind: (io, spawn, exit, output),
msg: ?string,
cmd: ?string,
status: ?int,
logs: ?[]string
)

# WatchServices establishes a stream with the daemon. Initially, the daemon
# evaluates the given services definition to an array of Command objects and
# sends a reply for each of them. After this initial evaluation, the daemon
Expand Down
Loading