Skip to content

Commit

Permalink
Add HDFS searchable snapshot integration (elastic#66185)
Browse files Browse the repository at this point in the history
Adds a bounded read implementation on the HDFS blob store as well as integration tests to 
the searchable snapshot project that ensures functionality on both kerberos and simple 
authentication HDFS.
  • Loading branch information
jbaiera committed Dec 14, 2020
1 parent 3dddf72 commit 553d046
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.repositories.hdfs;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -112,8 +114,23 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public InputStream readBlob(String blobName, long position, long length) {
throw new UnsupportedOperationException();
public InputStream readBlob(String blobName, long position, long length) throws IOException {
// FSDataInputStream does buffering internally
// FSDataInputStream can open connections on read() or skip() so we wrap in
// HDFSPrivilegedInputSteam which will ensure that underlying methods will
// be called with the proper privileges.
try {
return store.execute(fileContext -> {
FSDataInputStream fsInput = fileContext.open(new Path(path, blobName), bufferSize);
// As long as no read operations have happened yet on the stream, seeking
// should direct the datanode to start on the appropriate block, at the
// appropriate target position.
fsInput.seek(position);
return Streams.limitStream(new HDFSPrivilegedInputSteam(fsInput, securityContext), length);
});
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + blobName + "] blob not found");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.CoreMatchers;

import javax.security.auth.Subject;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
Expand All @@ -41,6 +45,7 @@
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collections;

import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
Expand Down Expand Up @@ -133,4 +138,44 @@ public void testReadOnly() throws Exception {
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
assertTrue(container.blobExists("foo"));
}

public void testReadRange() throws Exception {
FileContext fileContext = createTestContext();
// Constructor will not create dir if read only
HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, true);
FileContext.Util util = fileContext.util();
Path root = fileContext.makeQualified(new Path("dir"));
assertFalse(util.exists(root));
BlobPath blobPath = BlobPath.cleanPath().add("path");

// blobContainer() will not create path if read only
hdfsBlobStore.blobContainer(blobPath);
Path hdfsPath = root;
for (String p : blobPath) {
hdfsPath = new Path(hdfsPath, p);
}
assertFalse(util.exists(hdfsPath));

// if not read only, directory will be created
hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, false);
assertTrue(util.exists(root));
BlobContainer container = hdfsBlobStore.blobContainer(blobPath);
assertTrue(util.exists(hdfsPath));

byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
int pos = randomIntBetween(0, data.length / 2);
int len = randomIntBetween(pos, data.length) - pos;
assertArrayEquals(readBlobPartially(container, "foo", pos, len), Arrays.copyOfRange(data, pos, pos+len));
assertTrue(container.blobExists("foo"));
}

public static byte[] readBlobPartially(BlobContainer container, String name, int pos, int length) throws IOException {
byte[] data = new byte[length];
try (InputStream inputStream = container.readBlob(name, pos, length)) {
assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length));
assertThat(inputStream.read(), CoreMatchers.equalTo(-1));
}
return data;
}
}
173 changes: 173 additions & 0 deletions x-pack/plugin/searchable-snapshots/qa/hdfs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import org.apache.tools.ant.taskdefs.condition.Os
import org.elasticsearch.gradle.info.BuildParams
import org.elasticsearch.gradle.test.RestIntegTestTask

import java.nio.file.Files
import java.nio.file.Paths

import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE

apply plugin: 'elasticsearch.test.fixtures'
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
apply plugin: 'elasticsearch.rest-resources'

final Project hdfsFixtureProject = project(':test:fixtures:hdfs-fixture')
final Project krbFixtureProject = project(':test:fixtures:krb5kdc-fixture')
final Project hdfsRepoPluginProject = project(':plugins:repository-hdfs')

dependencies {
testImplementation project(path: xpackModule('searchable-snapshots'), configuration: 'testArtifacts')
testImplementation hdfsRepoPluginProject
}

restResources {
restApi {
includeCore 'indices', 'search', 'bulk', 'snapshot', 'nodes', '_common'
includeXpack 'searchable_snapshots'
}
}

testFixtures.useFixture(krbFixtureProject.path, 'hdfs-snapshot')

configurations {
hdfsFixture
}

dependencies {
hdfsFixture hdfsFixtureProject
// Set the keytab files in the classpath so that we can access them from test code without the security manager freaking out.
if (isEclipse == false) {
testRuntimeOnly files(krbFixtureProject.ext.krb5Keytabs("hdfs-snapshot", "hdfs_hdfs.build.elastic.co.keytab").parent)
}
}

normalization {
runtimeClasspath {
// ignore generated keytab files for the purposes of build avoidance
ignore '*.keytab'
// ignore fixture ports file which is on the classpath primarily to pacify the security manager
ignore 'ports'
}
}

String realm = "BUILD.ELASTIC.CO"
String krb5conf = krbFixtureProject.ext.krb5Conf("hdfs")

// Create HDFS File System Testing Fixtures
for (String fixtureName : ['hdfsFixture', 'secureHdfsFixture']) {
def tsk = project.tasks.register(fixtureName, org.elasticsearch.gradle.test.AntFixture) {
dependsOn project.configurations.hdfsFixture, krbFixtureProject.tasks.postProcessFixture
executable = "${BuildParams.runtimeJavaHome}/bin/java"
env 'CLASSPATH', "${-> project.configurations.hdfsFixture.asPath}"
maxWaitInSeconds 60
onlyIf { BuildParams.inFipsJvm == false }
waitCondition = { fixture, ant ->
// the hdfs.MiniHDFS fixture writes the ports file when
// it's ready, so we can just wait for the file to exist
return fixture.portsFile.exists()
}
final List<String> miniHDFSArgs = []

// If it's a secure fixture, then depend on Kerberos Fixture and principals + add the krb5conf to the JVM options
if (fixtureName.equals('secureHdfsFixture')) {
miniHDFSArgs.add("-Djava.security.krb5.conf=${krb5conf}")
}

// Common options
miniHDFSArgs.add('hdfs.MiniHDFS')
miniHDFSArgs.add(baseDir)

// If it's a secure fixture, then set the principal name and keytab locations to use for auth.
if (fixtureName.equals('secureHdfsFixture')) {
miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}")
miniHDFSArgs.add(project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab"))
}

args miniHDFSArgs.toArray()
}

// TODO: The task configuration block has side effects that require it currently to be always executed.
// Otherwise tests start failing. Therefore we enforce the task creation for now.
tsk.get()
}

// Disable integration test if Fips mode
integTest {
description = "Runs rest tests against an elasticsearch cluster with HDFS."
systemProperty 'test.hdfs.uri', 'hdfs://localhost:9999'
nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/simple'
onlyIf { BuildParams.inFipsJvm == false }
}

task integTestSecure(type: RestIntegTestTask) {
description = "Runs rest tests against an elasticsearch cluster with Secured HDFS."
nonInputProperties.systemProperty 'test.hdfs.uri', 'hdfs://localhost:9998'
nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/secure'
nonInputProperties.systemProperty "test.krb5.principal.es", "elasticsearch@${realm}"
nonInputProperties.systemProperty "test.krb5.principal.hdfs", "hdfs/hdfs.build.elastic.co@${realm}"
nonInputProperties.systemProperty(
"test.krb5.keytab.hdfs",
project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab")
)
onlyIf { BuildParams.inFipsJvm == false }
}
check.dependsOn(integTestSecure)

testClusters.configureEach {
testDistribution = 'DEFAULT'
plugin(hdfsRepoPluginProject.path)
setting 'xpack.license.self_generated.type', 'trial'
}

testClusters.integTestSecure {
systemProperty "java.security.krb5.conf", krb5conf
extraConfigFile(
"repository-hdfs/krb5.keytab",
file("${project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "elasticsearch.keytab")}"), IGNORE_VALUE
)
}

// Determine HDFS Fixture compatibility for the current build environment.
boolean fixtureSupported = false
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
// hdfs fixture will not start without hadoop native libraries on windows
String nativePath = System.getenv("HADOOP_HOME")
if (nativePath != null) {
java.nio.file.Path path = Paths.get(nativePath)
if (Files.isDirectory(path) &&
Files.exists(path.resolve("bin").resolve("winutils.exe")) &&
Files.exists(path.resolve("bin").resolve("hadoop.dll")) &&
Files.exists(path.resolve("bin").resolve("hdfs.dll"))) {
fixtureSupported = true
} else {
throw new IllegalStateException("HADOOP_HOME: ${path} is invalid, does not contain hadoop native libraries in \$HADOOP_HOME/bin")
}
}
} else {
fixtureSupported = true
}

boolean legalPath = rootProject.rootDir.toString().contains(" ") == false
if (legalPath == false) {
fixtureSupported = false
}

if (fixtureSupported) {
integTest.dependsOn hdfsFixture
integTestSecure.dependsOn secureHdfsFixture
} else {
integTest.enabled = false
integTestSecure.enabled = false
if (legalPath) {
logger.warn("hdfsFixture unsupported, please set HADOOP_HOME and put HADOOP_HOME\\bin in PATH")
} else {
logger.warn("hdfsFixture unsupported since there are spaces in the path: '" + rootProject.rootDir.toString() + "'")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots.hdfs;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsRestTestCase;

import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.not;

public class HdfsSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestTestCase {
@Override
protected String repositoryType() {
return "hdfs";
}

@Override
protected Settings repositorySettings() {
final String uri = System.getProperty("test.hdfs.uri");
assertThat(uri, not(blankOrNullString()));

final String path = System.getProperty("test.hdfs.path");
assertThat(path, not(blankOrNullString()));

// Optional based on type of test
final String principal = System.getProperty("test.krb5.principal.es");

Settings.Builder repositorySettings = Settings.builder().put("client", "searchable_snapshots").put("uri", uri).put("path", path);
if (principal != null) {
repositorySettings.put("security.principal", principal);
}
return repositorySettings.build();
}
}

0 comments on commit 553d046

Please sign in to comment.