From 553d046b7079af22f0bc1732af2aa52e74d5faae Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 14 Dec 2020 16:04:41 -0500 Subject: [PATCH] Add HDFS searchable snapshot integration (#66185) Adds a bounded read implementation on the HDFS blob store as well as integration tests to the searchable snapshot project that ensures functionality on both kerberos and simple authentication HDFS. --- .../repositories/hdfs/HdfsBlobContainer.java | 21 ++- .../hdfs/HdfsBlobStoreContainerTests.java | 45 +++++ .../searchable-snapshots/qa/hdfs/build.gradle | 173 ++++++++++++++++++ .../hdfs/HdfsSearchableSnapshotsIT.java | 38 ++++ 4 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/searchable-snapshots/qa/hdfs/build.gradle create mode 100644 x-pack/plugin/searchable-snapshots/qa/hdfs/src/test/java/org/elasticsearch/xpack/searchablesnapshots/hdfs/HdfsSearchableSnapshotsIT.java diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index cf9328436b40..4703cdc296de 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.hdfs; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; @@ -33,6 +34,7 @@ import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; import java.io.FileNotFoundException; @@ -112,8 +114,23 @@ public InputStream readBlob(String blobName) throws IOException { } @Override - public InputStream readBlob(String blobName, long position, long length) { - throw new UnsupportedOperationException(); + public InputStream readBlob(String blobName, long position, long length) throws IOException { + // FSDataInputStream does buffering internally + // FSDataInputStream can open connections on read() or skip() so we wrap in + // HDFSPrivilegedInputSteam which will ensure that underlying methods will + // be called with the proper privileges. + try { + return store.execute(fileContext -> { + FSDataInputStream fsInput = fileContext.open(new Path(path, blobName), bufferSize); + // As long as no read operations have happened yet on the stream, seeking + // should direct the datanode to start on the appropriate block, at the + // appropriate target position. + fsInput.seek(position); + return Streams.limitStream(new HDFSPrivilegedInputSteam(fsInput, securityContext), length); + }); + } catch (FileNotFoundException fnfe) { + throw new NoSuchFileException("[" + blobName + "] blob not found"); + } } @Override diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java index 0c6e6f9044a2..7948a5f71526 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java @@ -29,10 +29,14 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.test.ESTestCase; +import org.hamcrest.CoreMatchers; import javax.security.auth.Subject; +import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.URI; @@ -41,6 +45,7 @@ import java.security.PrivilegedAction; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.Collections; import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes; @@ -133,4 +138,44 @@ public void testReadOnly() throws Exception { assertArrayEquals(readBlobFully(container, "foo", data.length), data); assertTrue(container.blobExists("foo")); } + + public void testReadRange() throws Exception { + FileContext fileContext = createTestContext(); + // Constructor will not create dir if read only + HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, true); + FileContext.Util util = fileContext.util(); + Path root = fileContext.makeQualified(new Path("dir")); + assertFalse(util.exists(root)); + BlobPath blobPath = BlobPath.cleanPath().add("path"); + + // blobContainer() will not create path if read only + hdfsBlobStore.blobContainer(blobPath); + Path hdfsPath = root; + for (String p : blobPath) { + hdfsPath = new Path(hdfsPath, p); + } + assertFalse(util.exists(hdfsPath)); + + // if not read only, directory will be created + hdfsBlobStore = new HdfsBlobStore(fileContext, "dir", 1024, false); + assertTrue(util.exists(root)); + BlobContainer container = hdfsBlobStore.blobContainer(blobPath); + assertTrue(util.exists(hdfsPath)); + + byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); + writeBlob(container, "foo", new BytesArray(data), randomBoolean()); + int pos = randomIntBetween(0, data.length / 2); + int len = randomIntBetween(pos, data.length) - pos; + assertArrayEquals(readBlobPartially(container, "foo", pos, len), Arrays.copyOfRange(data, pos, pos+len)); + assertTrue(container.blobExists("foo")); + } + + public static byte[] readBlobPartially(BlobContainer container, String name, int pos, int length) throws IOException { + byte[] data = new byte[length]; + try (InputStream inputStream = container.readBlob(name, pos, length)) { + assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length)); + assertThat(inputStream.read(), CoreMatchers.equalTo(-1)); + } + return data; + } } diff --git a/x-pack/plugin/searchable-snapshots/qa/hdfs/build.gradle b/x-pack/plugin/searchable-snapshots/qa/hdfs/build.gradle new file mode 100644 index 000000000000..8936c61a89f3 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/qa/hdfs/build.gradle @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import org.apache.tools.ant.taskdefs.condition.Os +import org.elasticsearch.gradle.info.BuildParams +import org.elasticsearch.gradle.test.RestIntegTestTask + +import java.nio.file.Files +import java.nio.file.Paths + +import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE + +apply plugin: 'elasticsearch.test.fixtures' +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' +apply plugin: 'elasticsearch.rest-resources' + +final Project hdfsFixtureProject = project(':test:fixtures:hdfs-fixture') +final Project krbFixtureProject = project(':test:fixtures:krb5kdc-fixture') +final Project hdfsRepoPluginProject = project(':plugins:repository-hdfs') + +dependencies { + testImplementation project(path: xpackModule('searchable-snapshots'), configuration: 'testArtifacts') + testImplementation hdfsRepoPluginProject +} + +restResources { + restApi { + includeCore 'indices', 'search', 'bulk', 'snapshot', 'nodes', '_common' + includeXpack 'searchable_snapshots' + } +} + +testFixtures.useFixture(krbFixtureProject.path, 'hdfs-snapshot') + +configurations { + hdfsFixture +} + +dependencies { + hdfsFixture hdfsFixtureProject + // Set the keytab files in the classpath so that we can access them from test code without the security manager freaking out. + if (isEclipse == false) { + testRuntimeOnly files(krbFixtureProject.ext.krb5Keytabs("hdfs-snapshot", "hdfs_hdfs.build.elastic.co.keytab").parent) + } +} + +normalization { + runtimeClasspath { + // ignore generated keytab files for the purposes of build avoidance + ignore '*.keytab' + // ignore fixture ports file which is on the classpath primarily to pacify the security manager + ignore 'ports' + } +} + +String realm = "BUILD.ELASTIC.CO" +String krb5conf = krbFixtureProject.ext.krb5Conf("hdfs") + +// Create HDFS File System Testing Fixtures +for (String fixtureName : ['hdfsFixture', 'secureHdfsFixture']) { + def tsk = project.tasks.register(fixtureName, org.elasticsearch.gradle.test.AntFixture) { + dependsOn project.configurations.hdfsFixture, krbFixtureProject.tasks.postProcessFixture + executable = "${BuildParams.runtimeJavaHome}/bin/java" + env 'CLASSPATH', "${-> project.configurations.hdfsFixture.asPath}" + maxWaitInSeconds 60 + onlyIf { BuildParams.inFipsJvm == false } + waitCondition = { fixture, ant -> + // the hdfs.MiniHDFS fixture writes the ports file when + // it's ready, so we can just wait for the file to exist + return fixture.portsFile.exists() + } + final List miniHDFSArgs = [] + + // If it's a secure fixture, then depend on Kerberos Fixture and principals + add the krb5conf to the JVM options + if (fixtureName.equals('secureHdfsFixture')) { + miniHDFSArgs.add("-Djava.security.krb5.conf=${krb5conf}") + } + + // Common options + miniHDFSArgs.add('hdfs.MiniHDFS') + miniHDFSArgs.add(baseDir) + + // If it's a secure fixture, then set the principal name and keytab locations to use for auth. + if (fixtureName.equals('secureHdfsFixture')) { + miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}") + miniHDFSArgs.add(project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab")) + } + + args miniHDFSArgs.toArray() + } + + // TODO: The task configuration block has side effects that require it currently to be always executed. + // Otherwise tests start failing. Therefore we enforce the task creation for now. + tsk.get() +} + +// Disable integration test if Fips mode +integTest { + description = "Runs rest tests against an elasticsearch cluster with HDFS." + systemProperty 'test.hdfs.uri', 'hdfs://localhost:9999' + nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/simple' + onlyIf { BuildParams.inFipsJvm == false } +} + +task integTestSecure(type: RestIntegTestTask) { + description = "Runs rest tests against an elasticsearch cluster with Secured HDFS." + nonInputProperties.systemProperty 'test.hdfs.uri', 'hdfs://localhost:9998' + nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/secure' + nonInputProperties.systemProperty "test.krb5.principal.es", "elasticsearch@${realm}" + nonInputProperties.systemProperty "test.krb5.principal.hdfs", "hdfs/hdfs.build.elastic.co@${realm}" + nonInputProperties.systemProperty( + "test.krb5.keytab.hdfs", + project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab") + ) + onlyIf { BuildParams.inFipsJvm == false } +} +check.dependsOn(integTestSecure) + +testClusters.configureEach { + testDistribution = 'DEFAULT' + plugin(hdfsRepoPluginProject.path) + setting 'xpack.license.self_generated.type', 'trial' +} + +testClusters.integTestSecure { + systemProperty "java.security.krb5.conf", krb5conf + extraConfigFile( + "repository-hdfs/krb5.keytab", + file("${project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "elasticsearch.keytab")}"), IGNORE_VALUE + ) +} + +// Determine HDFS Fixture compatibility for the current build environment. +boolean fixtureSupported = false +if (Os.isFamily(Os.FAMILY_WINDOWS)) { + // hdfs fixture will not start without hadoop native libraries on windows + String nativePath = System.getenv("HADOOP_HOME") + if (nativePath != null) { + java.nio.file.Path path = Paths.get(nativePath) + if (Files.isDirectory(path) && + Files.exists(path.resolve("bin").resolve("winutils.exe")) && + Files.exists(path.resolve("bin").resolve("hadoop.dll")) && + Files.exists(path.resolve("bin").resolve("hdfs.dll"))) { + fixtureSupported = true + } else { + throw new IllegalStateException("HADOOP_HOME: ${path} is invalid, does not contain hadoop native libraries in \$HADOOP_HOME/bin") + } + } +} else { + fixtureSupported = true +} + +boolean legalPath = rootProject.rootDir.toString().contains(" ") == false +if (legalPath == false) { + fixtureSupported = false +} + +if (fixtureSupported) { + integTest.dependsOn hdfsFixture + integTestSecure.dependsOn secureHdfsFixture +} else { + integTest.enabled = false + integTestSecure.enabled = false + if (legalPath) { + logger.warn("hdfsFixture unsupported, please set HADOOP_HOME and put HADOOP_HOME\\bin in PATH") + } else { + logger.warn("hdfsFixture unsupported since there are spaces in the path: '" + rootProject.rootDir.toString() + "'") + } +} diff --git a/x-pack/plugin/searchable-snapshots/qa/hdfs/src/test/java/org/elasticsearch/xpack/searchablesnapshots/hdfs/HdfsSearchableSnapshotsIT.java b/x-pack/plugin/searchable-snapshots/qa/hdfs/src/test/java/org/elasticsearch/xpack/searchablesnapshots/hdfs/HdfsSearchableSnapshotsIT.java new file mode 100644 index 000000000000..5f62fba2ed7a --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/qa/hdfs/src/test/java/org/elasticsearch/xpack/searchablesnapshots/hdfs/HdfsSearchableSnapshotsIT.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.hdfs; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsRestTestCase; + +import static org.hamcrest.Matchers.blankOrNullString; +import static org.hamcrest.Matchers.not; + +public class HdfsSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestTestCase { + @Override + protected String repositoryType() { + return "hdfs"; + } + + @Override + protected Settings repositorySettings() { + final String uri = System.getProperty("test.hdfs.uri"); + assertThat(uri, not(blankOrNullString())); + + final String path = System.getProperty("test.hdfs.path"); + assertThat(path, not(blankOrNullString())); + + // Optional based on type of test + final String principal = System.getProperty("test.krb5.principal.es"); + + Settings.Builder repositorySettings = Settings.builder().put("client", "searchable_snapshots").put("uri", uri).put("path", path); + if (principal != null) { + repositorySettings.put("security.principal", principal); + } + return repositorySettings.build(); + } +}