Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Begin factoring out the protocol code #1165

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 24 additions & 40 deletions src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "build-result.hh"
#include "path.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh"
#include "current-process.hh"
#include "processes.hh"
Expand Down Expand Up @@ -123,13 +124,10 @@ static void copyClosureTo(
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes;
ServeProto::write(destStore, conn, closure);
conn.to.flush();

/* Get back the set of paths that are already valid on the remote
host. */
auto present = ServeProto::Serialise<StorePathSet>::read(destStore, conn);
auto present = conn.queryValidPaths(
destStore, true, closure, useSubstitutes);

if (present.size() == closure.size()) return;

Expand Down Expand Up @@ -195,33 +193,6 @@ static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, cons
return {std::move(logFile), std::move(logFD)};
}

/**
* @param conn is not fully initialized; it is this functions job to set
* the `remoteVersion` field after the handshake is completed.
* Therefore, no `ServeProto::Serialize` functions can be used until
* that field is set.
*/
static void handshake(Machine::Connection & conn, unsigned int repeats)
{
constexpr ServeProto::Version our_version = 0x206;

conn.to << SERVE_MAGIC_1 << our_version;
conn.to.flush();

unsigned int magic = readInt(conn.from);
if (magic != SERVE_MAGIC_2)
throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", conn.machine->sshName);
conn.remoteVersion = readInt(conn.from);
// Now `conn` is initialized.
if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200)
throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", conn.machine->sshName);
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0)
throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName);

// Do not attempt to speak a newer version of the protocol
conn.remoteVersion = std::min(conn.remoteVersion, our_version);
}

static BasicDerivation sendInputs(
State & state,
Step & step,
Expand Down Expand Up @@ -291,10 +262,7 @@ static BuildResult performBuild(
counter & nrStepsBuilding
)
{
conn.to << ServeProto::Command::BuildDerivation << localStore.printStorePath(drvPath);
writeDerivation(conn.to, localStore, drv);
ServeProto::write(localStore, conn, options);
conn.to.flush();
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);

BuildResult result;

Expand Down Expand Up @@ -498,24 +466,40 @@ void State::buildRemote(ref<Store> destStore,
});

Machine::Connection conn {
.from = child.from.get(),
.to = child.to.get(),
.machine = machine,
{
.to = child.to.get(),
.from = child.from.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};

Finally updateStats([&]() {
bytesReceived += conn.from.read;
bytesSent += conn.to.written;
});

constexpr ServeProto::Version our_version = 0x206;

try {
build_remote::handshake(conn, buildOptions.nrRepeats);
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->sshName);
} catch (EndOfFile & e) {
child.pid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s);
}

// Do not attempt to speak a newer version of the protocol.
//
// Per https://github.com/NixOS/nix/issues/9584 should be handled as
// part of `handshake` in upstream nix.
conn.remoteVersion = std::min(conn.remoteVersion, our_version);

{
auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0;
Expand Down
23 changes: 2 additions & 21 deletions src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "sync.hh"
#include "nar-extractor.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"


typedef unsigned int BuildID;
Expand Down Expand Up @@ -302,29 +303,9 @@ struct Machine
}

// A connection to a machine
struct Connection {
nix::FdSource from;
nix::FdSink to;
nix::ServeProto::Version remoteVersion;

struct Connection : nix::ServeProto::BasicClientConnection {
// Backpointer to the machine
ptr machine;

operator nix::ServeProto::ReadConn ()
{
return {
.from = from,
.version = remoteVersion,
};
}

operator nix::ServeProto::WriteConn ()
{
return {
.to = to,
.version = remoteVersion,
};
}
};
};

Expand Down
Loading