Skip to content

Commit

Permalink
Merge pull request #1165 from obsidiansystems/factor-out-proto
Browse files Browse the repository at this point in the history
Begin factoring out the protocol code
  • Loading branch information
Ericson2314 authored Jan 22, 2024
2 parents db7aa01 + 4ac31c8 commit 34c51fc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 61 deletions.
64 changes: 24 additions & 40 deletions src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "build-result.hh"
#include "path.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh"
#include "current-process.hh"
#include "processes.hh"
Expand Down Expand Up @@ -123,13 +124,10 @@ static void copyClosureTo(
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes;
ServeProto::write(destStore, conn, closure);
conn.to.flush();

/* Get back the set of paths that are already valid on the remote
host. */
auto present = ServeProto::Serialise<StorePathSet>::read(destStore, conn);
auto present = conn.queryValidPaths(
destStore, true, closure, useSubstitutes);

if (present.size() == closure.size()) return;

Expand Down Expand Up @@ -195,33 +193,6 @@ static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, cons
return {std::move(logFile), std::move(logFD)};
}

/**
* @param conn is not fully initialized; it is this functions job to set
* the `remoteVersion` field after the handshake is completed.
* Therefore, no `ServeProto::Serialize` functions can be used until
* that field is set.
*/
static void handshake(Machine::Connection & conn, unsigned int repeats)
{
constexpr ServeProto::Version our_version = 0x206;

conn.to << SERVE_MAGIC_1 << our_version;
conn.to.flush();

unsigned int magic = readInt(conn.from);
if (magic != SERVE_MAGIC_2)
throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", conn.machine->sshName);
conn.remoteVersion = readInt(conn.from);
// Now `conn` is initialized.
if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200)
throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", conn.machine->sshName);
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0)
throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName);

// Do not attempt to speak a newer version of the protocol
conn.remoteVersion = std::min(conn.remoteVersion, our_version);
}

static BasicDerivation sendInputs(
State & state,
Step & step,
Expand Down Expand Up @@ -291,10 +262,7 @@ static BuildResult performBuild(
counter & nrStepsBuilding
)
{
conn.to << ServeProto::Command::BuildDerivation << localStore.printStorePath(drvPath);
writeDerivation(conn.to, localStore, drv);
ServeProto::write(localStore, conn, options);
conn.to.flush();
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);

BuildResult result;

Expand Down Expand Up @@ -498,24 +466,40 @@ void State::buildRemote(ref<Store> destStore,
});

Machine::Connection conn {
.from = child.from.get(),
.to = child.to.get(),
.machine = machine,
{
.to = child.to.get(),
.from = child.from.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};

Finally updateStats([&]() {
bytesReceived += conn.from.read;
bytesSent += conn.to.written;
});

constexpr ServeProto::Version our_version = 0x206;

try {
build_remote::handshake(conn, buildOptions.nrRepeats);
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->sshName);
} catch (EndOfFile & e) {
child.pid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s);
}

// Do not attempt to speak a newer version of the protocol.
//
// Per https://github.com/NixOS/nix/issues/9584 should be handled as
// part of `handshake` in upstream nix.
conn.remoteVersion = std::min(conn.remoteVersion, our_version);

{
auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0;
Expand Down
23 changes: 2 additions & 21 deletions src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "sync.hh"
#include "nar-extractor.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"


typedef unsigned int BuildID;
Expand Down Expand Up @@ -302,29 +303,9 @@ struct Machine
}

// A connection to a machine
struct Connection {
nix::FdSource from;
nix::FdSink to;
nix::ServeProto::Version remoteVersion;

struct Connection : nix::ServeProto::BasicClientConnection {
// Backpointer to the machine
ptr machine;

operator nix::ServeProto::ReadConn ()
{
return {
.from = from,
.version = remoteVersion,
};
}

operator nix::ServeProto::WriteConn ()
{
return {
.to = to,
.version = remoteVersion,
};
}
};
};

Expand Down

0 comments on commit 34c51fc

Please sign in to comment.