Skip to content

Commit

Permalink
Merge pull request IQSS#8612 from GlobalDataverseCommunityConsortium/…
Browse files Browse the repository at this point in the history
…GDCC/8604-improve_archiver_error_handling

GDCC/8604 Improve archiver error handling
  • Loading branch information
kcondon authored Jul 18, 2022
2 parents 86f69bd + 3d01b13 commit 567e506
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 157 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@
<dependency>
<groupId>org.duracloud</groupId>
<artifactId>common</artifactId>
<version>7.1.0</version>
<version>7.1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand All @@ -478,7 +478,7 @@
<dependency>
<groupId>org.duracloud</groupId>
<artifactId>storeclient</artifactId>
<version>7.1.0</version>
<version>7.1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package edu.harvard.iq.dataverse.engine.command.impl;

import edu.harvard.iq.dataverse.DatasetVersion;
import edu.harvard.iq.dataverse.DvObject;
import edu.harvard.iq.dataverse.authorization.Permission;
import edu.harvard.iq.dataverse.authorization.users.ApiToken;
import edu.harvard.iq.dataverse.authorization.users.AuthenticatedUser;
Expand All @@ -12,9 +11,13 @@
import edu.harvard.iq.dataverse.engine.command.exception.CommandException;
import edu.harvard.iq.dataverse.settings.SettingsServiceBean;
import edu.harvard.iq.dataverse.util.bagit.BagGenerator;
import edu.harvard.iq.dataverse.util.bagit.OREMap;
import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult;

import java.util.Date;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.security.DigestInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
Expand All @@ -24,7 +27,9 @@ public abstract class AbstractSubmitToArchiveCommand extends AbstractCommand<Dat

private final DatasetVersion version;
private final Map<String, String> requestedSettings = new HashMap<String, String>();
protected boolean success=false;
private static final Logger logger = Logger.getLogger(AbstractSubmitToArchiveCommand.class.getName());
private static final int MAX_ZIP_WAIT = 20000;
private static final int DEFAULT_THREADS = 2;

public AbstractSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) {
Expand Down Expand Up @@ -87,4 +92,70 @@ public String describe() {
+ version.getFriendlyVersionNumber()+")]";
}

public Thread startBagThread(DatasetVersion dv, PipedInputStream in, DigestInputStream digestInputStream2,
String dataciteXml, ApiToken token) throws IOException, InterruptedException {
Thread bagThread = new Thread(new Runnable() {
public void run() {
try (PipedOutputStream out = new PipedOutputStream(in)) {
// Generate bag
BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml);
bagger.setNumConnections(getNumberOfBagGeneratorThreads());
bagger.setAuthenticationKey(token.getTokenString());
bagger.generateBag(out);
success = true;
} catch (Exception e) {
logger.severe("Error creating bag: " + e.getMessage());
// TODO Auto-generated catch block
e.printStackTrace();
try {
digestInputStream2.close();
} catch (Exception ex) {
logger.warning(ex.getLocalizedMessage());
}
throw new RuntimeException("Error creating bag: " + e.getMessage());
}
}
});
bagThread.start();
/*
* The following loop handles two issues. First, with no delay, the
* bucket.create() call below can get started before the piped streams are set
* up, causing a failure (seen when triggered in a PostPublishDataset workflow).
* A minimal initial wait, e.g. until some bytes are available, would address
* this. Second, the BagGenerator class, due to it's use of parallel streaming
* creation of the zip file, has the characteristic that it makes a few bytes
* available - from setting up the directory structure for the zip file -
* significantly earlier than it is ready to stream file content (e.g. for
* thousands of files and GB of content). If, for these large datasets,
* the transfer is started as soon as bytes are available, the call can
* timeout before the bytes for all the zipped files are available. To manage
* this, the loop waits until 90K bytes are available, larger than any expected
* dir structure for the zip and implying that the main zipped content is
* available, or until the thread terminates, with all of its content written to
* the pipe. (Note the PipedInputStream buffer is set at 100K above - I didn't
* want to test whether that means that exactly 100K bytes will be available()
* for large datasets or not, so the test below is at 90K.)
*
* An additional sanity check limits the wait to 20K (MAX_ZIP_WAIT) seconds. The BagGenerator
* has been used to archive >120K files, 2K directories, and ~600GB files on the
* SEAD project (streaming content to disk rather than over an internet
* connection) which would take longer than 20K seconds (even 10+ hours) and might
* produce an initial set of bytes for directories > 90K. If Dataverse ever
* needs to support datasets of this size, the numbers here would need to be
* increased, and/or a change in how archives are sent to google (e.g. as
* multiple blobs that get aggregated) would be required.
*/
int i = 0;
while (digestInputStream2.available() <= 90000 && i < MAX_ZIP_WAIT && bagThread.isAlive()) {
Thread.sleep(1000);
logger.fine("avail: " + digestInputStream2.available() + " : " + bagThread.getState().toString());
i++;
}
logger.fine("Bag: transfer started, i=" + i + ", avail = " + digestInputStream2.available());
if(i==MAX_ZIP_WAIT) {
throw new IOException("Stream not available");
}
return bagThread;
}

}
Loading

0 comments on commit 567e506

Please sign in to comment.