Skip to content

Commit

Permalink
Use nix::serv_proto::BasicConnection in build_remote.cc
Browse files Browse the repository at this point in the history
This lays the foundation for being able to dedup the protocol code.
  • Loading branch information
Ericson2314 committed Feb 20, 2022
1 parent 514caac commit 5a995a8
Showing 1 changed file with 57 additions and 55 deletions.
112 changes: 57 additions & 55 deletions src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <fcntl.h>

#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh"
#include "util.hh"
#include "worker-protocol.hh"
Expand Down Expand Up @@ -105,7 +106,7 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil


static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore,
FdSource & from, FdSink & to, const StorePathSet & paths,
serve_proto::BasicConnection & conn, const StorePathSet & paths,
bool useSubstitutes = false)
{
StorePathSet closure;
Expand All @@ -116,13 +117,13 @@ static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore,
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
to << cmdQueryValidPaths << 1 << useSubstitutes;
worker_proto::write(destStore, to, closure);
to.flush();
conn.to << cmdQueryValidPaths << 1 << useSubstitutes;
worker_proto::write(destStore, conn.to, closure);
conn.to.flush();

/* Get back the set of paths that are already valid on the remote
host. */
auto present = worker_proto::read(destStore, from, Phantom<StorePathSet> {});
auto present = worker_proto::read(destStore, conn.from, Phantom<StorePathSet> {});

if (present.size() == closure.size()) return;

Expand All @@ -137,11 +138,11 @@ static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore,
std::unique_lock<std::timed_mutex> sendLock(sendMutex,
std::chrono::seconds(600));

to << cmdImportPaths;
destStore.exportPaths(missing, to);
to.flush();
conn.to << cmdImportPaths;
destStore.exportPaths(missing, conn.to);
conn.to.flush();

if (readInt(from) != 1)
if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure");
}

Expand Down Expand Up @@ -223,28 +224,29 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */
});

FdSource from(child.from.get());
FdSink to(child.to.get());
serve_proto::BasicConnection conn {
.to = child.to.get(),
.from = child.from.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
};

Finally updateStats([&]() {
bytesReceived += from.read;
bytesSent += to.written;
bytesReceived += conn.from.read;
bytesSent += conn.to.written;
});

/* Handshake. */
unsigned int remoteVersion;

try {
to << SERVE_MAGIC_1 << 0x204;
to.flush();
conn.to << SERVE_MAGIC_1 << 0x204;
conn.to.flush();

unsigned int magic = readInt(from);
unsigned int magic = readInt(conn.from);
if (magic != SERVE_MAGIC_2)
throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", machine->sshName);
remoteVersion = readInt(from);
if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200)
conn.remoteVersion = readInt(conn.from);
if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200)
throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", machine->sshName);
if (GET_PROTOCOL_MINOR(remoteVersion) < 3 && repeats > 0)
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0)
throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", machine->sshName);

} catch (EndOfFile & e) {
Expand Down Expand Up @@ -308,7 +310,7 @@ void State::buildRemote(ref<Store> destStore,
destStore->computeFSClosure(inputs, closure);
copyPaths(*destStore, *localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else {
copyClosureTo(machine->state->sendLock, *destStore, from, to, inputs, true);
copyClosureTo(machine->state->sendLock, *destStore, conn, inputs, true);
}

auto now2 = std::chrono::steady_clock::now();
Expand All @@ -335,40 +337,40 @@ void State::buildRemote(ref<Store> destStore,

updateStep(ssBuilding);

to << cmdBuildDerivation << localStore->printStorePath(step->drvPath);
writeDerivation(to, *localStore, basicDrv);
to << maxSilentTime << buildTimeout;
if (GET_PROTOCOL_MINOR(remoteVersion) >= 2)
to << maxLogSize;
if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) {
to << repeats // == build-repeat
<< step->isDeterministic; // == enforce-determinism
conn.to << cmdBuildDerivation << localStore->printStorePath(step->drvPath);
writeDerivation(conn.to, *localStore, basicDrv);
conn.to << maxSilentTime << buildTimeout;
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2)
conn.to << maxLogSize;
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) {
conn.to << repeats // == build-repeat
<< step->isDeterministic; // == enforce-determinism
}
to.flush();
conn.to.flush();

result.startTime = time(0);
int res;
{
MaintainCount<counter> mc(nrStepsBuilding);
res = readInt(from);
res = readInt(conn.from);
}
result.stopTime = time(0);

result.errorMsg = readString(from);
if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) {
result.timesBuilt = readInt(from);
result.isNonDeterministic = readInt(from);
auto start = readInt(from);
auto stop = readInt(from);
result.errorMsg = readString(conn.from);
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) {
result.timesBuilt = readInt(conn.from);
result.isNonDeterministic = readInt(conn.from);
auto start = readInt(conn.from);
auto stop = readInt(conn.from);
if (start && start) {
/* Note: this represents the duration of a single
round, rather than all rounds. */
result.startTime = start;
result.stopTime = stop;
}
}
if (GET_PROTOCOL_MINOR(remoteVersion) >= 6) {
worker_proto::read(*localStore, from, Phantom<DrvOutputs> {});
if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) {
worker_proto::read(*localStore, conn.from, Phantom<DrvOutputs> {});
}
switch ((BuildResult::Status) res) {
case BuildResult::Built:
Expand Down Expand Up @@ -444,19 +446,19 @@ void State::buildRemote(ref<Store> destStore,
/* Get info about each output path. */
std::map<StorePath, ValidPathInfo> infos;
size_t totalNarSize = 0;
to << cmdQueryPathInfos;
worker_proto::write(*localStore, to, outputs);
to.flush();
conn.to << cmdQueryPathInfos;
worker_proto::write(*localStore, conn.to, outputs);
conn.to.flush();
while (true) {
auto storePathS = readString(from);
auto storePathS = readString(conn.from);
if (storePathS == "") break;
auto deriver = readString(from); // deriver
auto references = worker_proto::read(*localStore, from, Phantom<StorePathSet> {});
readLongLong(from); // download size
auto narSize = readLongLong(from);
auto narHash = Hash::parseAny(readString(from), htSHA256);
auto ca = parseContentAddressOpt(readString(from));
readStrings<StringSet>(from); // sigs
auto deriver = readString(conn.from); // deriver
auto references = worker_proto::read(*localStore, conn.from, Phantom<StorePathSet> {});
readLongLong(conn.from); // download size
auto narSize = readLongLong(conn.from);
auto narHash = Hash::parseAny(readString(conn.from), htSHA256);
auto ca = parseContentAddressOpt(readString(conn.from));
readStrings<StringSet>(conn.from); // sigs
ValidPathInfo info(localStore->parseStorePath(storePathS), narHash);
assert(outputs.count(info.path));
info.references = references;
Expand Down Expand Up @@ -495,10 +497,10 @@ void State::buildRemote(ref<Store> destStore,
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
to << cmdDumpStorePath << localStore->printStorePath(path);
to.flush();
conn.to << cmdDumpStorePath << localStore->printStorePath(path);
conn.to.flush();

TeeSource tee(from, sink);
TeeSource tee(conn.from, sink);
extractNarData(tee, localStore->printStorePath(path), narMembers);
});

Expand Down

0 comments on commit 5a995a8

Please sign in to comment.