Skip to content

Commit

Permalink
Merge pull request #56 from trocco-io/revert-55-revert-54-retry-uploa…
Browse files Browse the repository at this point in the history
…d-files

[bugfix]Add file upload retry handling
  • Loading branch information
u110 committed Dec 22, 2022
2 parents ad48ce5 + 279c0bf commit 77b1e56
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 14 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ dependencies {
testImplementation "org.embulk:embulk-parser-csv:0.10.31"

compile "org.embulk:embulk-output-jdbc:0.10.2"
compile "net.snowflake:snowflake-jdbc:3.13.14"
compile "net.snowflake:snowflake-jdbc:3.13.26"
}
embulkPlugin {
mainClass = "org.embulk.output.SnowflakeOutputPlugin"
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependency-locks/embulkPluginRuntime.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ com.fasterxml.jackson.core:jackson-core:2.6.7
com.fasterxml.jackson.core:jackson-databind:2.6.7
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7
javax.validation:validation-api:1.1.0.Final
net.snowflake:snowflake-jdbc:3.13.14
net.snowflake:snowflake-jdbc:3.13.26
org.embulk:embulk-output-jdbc:0.10.2
org.embulk:embulk-util-config:0.3.0
org.embulk:embulk-util-json:0.1.1
Expand Down
9 changes: 9 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
build: gradlew/build
gem: gradlew/gem
check: gradlew/check
lint-auto: gradlew/spotlessApply
test: gradlew/test
update-dependencies:
./gradlew dependencies --write-locks
gradlew/%:
./gradlew $(@F)
10 changes: 9 additions & 1 deletion src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public interface SnowflakePluginTask extends PluginTask {
@Config("delete_stage")
@ConfigDefault("false")
public boolean getDeleteStage();

@Config("max_upload_retries")
@ConfigDefault("3")
public int getMaxUploadRetries();
}

@Override
Expand Down Expand Up @@ -145,7 +149,11 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
snowflakeCon.runCreateStage(this.stageIdentifier);
}

return new SnowflakeCopyBatchInsert(getConnector(task, true), this.stageIdentifier, false);
return new SnowflakeCopyBatchInsert(
getConnector(task, true),
this.stageIdentifier,
false,
((SnowflakePluginTask) task).getMaxUploadRetries());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
protected static final String nullString = "\\N";
protected static final String newLineString = "\n";
protected static final String delimiterString = "\t";
private final int maxUploadRetries;

private SnowflakeOutputConnection connection = null;
private TableIdentifier tableIdentifier = null;
Expand All @@ -39,7 +40,10 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
private List<Future<Void>> uploadAndCopyFutures;

public SnowflakeCopyBatchInsert(
JdbcOutputConnector connector, StageIdentifier stageIdentifier, boolean deleteStageFile)
JdbcOutputConnector connector,
StageIdentifier stageIdentifier,
boolean deleteStageFile,
int maxUploadRetries)
throws IOException {
this.index = 0;
openNewFile();
Expand All @@ -48,6 +52,7 @@ public SnowflakeCopyBatchInsert(
this.executorService = Executors.newCachedThreadPool();
this.deleteStageFile = deleteStageFile;
this.uploadAndCopyFutures = new ArrayList();
this.maxUploadRetries = maxUploadRetries;
}

@Override
Expand Down Expand Up @@ -251,7 +256,7 @@ public void flush() throws IOException, SQLException {
String snowflakeStageFileName = "embulk_snowflake_" + SnowflakeUtils.randomString(8);

UploadTask uploadTask =
new UploadTask(file, batchRows, stageIdentifier, snowflakeStageFileName);
new UploadTask(file, batchRows, stageIdentifier, snowflakeStageFileName, maxUploadRetries);
Future<Void> uploadFuture = executorService.submit(uploadTask);
uploadAndCopyFutures.add(uploadFuture);

Expand Down Expand Up @@ -330,28 +335,48 @@ private class UploadTask implements Callable<Void> {
private final int batchRows;
private final String snowflakeStageFileName;
private final StageIdentifier stageIdentifier;
private final int maxUploadRetries;

public UploadTask(
File file, int batchRows, StageIdentifier stageIdentifier, String snowflakeStageFileName) {
File file,
int batchRows,
StageIdentifier stageIdentifier,
String snowflakeStageFileName,
int maxUploadRetries) {
this.file = file;
this.batchRows = batchRows;
this.snowflakeStageFileName = snowflakeStageFileName;
this.stageIdentifier = stageIdentifier;
this.maxUploadRetries = maxUploadRetries;
}

public Void call() throws IOException, SQLException {
logger.info(
String.format(
"Uploading file id %s to Snowflake (%,d bytes %,d rows)",
snowflakeStageFileName, file.length(), batchRows));

public Void call() throws IOException, SQLException, InterruptedException {
int retries = 0;
try {
long startTime = System.currentTimeMillis();
// put file to snowflake internal storage
SnowflakeOutputConnection con = (SnowflakeOutputConnection) connector.connect(true);

FileInputStream fileInputStream = new FileInputStream(file);
con.runUploadFile(stageIdentifier, snowflakeStageFileName, fileInputStream);
while (true) {
try {
logger.info(
String.format(
"Uploading file id %s to Snowflake (%,d bytes %,d rows)",
snowflakeStageFileName, file.length(), batchRows));
FileInputStream fileInputStream = new FileInputStream(file);
con.runUploadFile(stageIdentifier, snowflakeStageFileName, fileInputStream);
break;
} catch (SQLException e) {
retries++;
if (retries > this.maxUploadRetries) {
throw e;
}
logger.warn(
String.format(
"Upload error %s file %s retries: %d", e, snowflakeStageFileName, retries));
Thread.sleep(retries * retries * 1000);
}
}

double seconds = (System.currentTimeMillis() - startTime) / 1000.0;

Expand Down

0 comments on commit 77b1e56

Please sign in to comment.