Skip to content

Commit

Permalink
Add permission checks before reading from HDFS stream (#26716)
Browse files Browse the repository at this point in the history
Add checks for special permissions before reading hdfs stream data. Also adds test from 
readonly repository fix. MiniHDFS will now start with an existing repository with a single snapshot 
contained within. Readonly Repository is created in tests and attempts to list the snapshots 
within this repo.
  • Loading branch information
jbaiera committed Sep 21, 2017
1 parent a230d2a commit cab0023
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -45,12 +46,14 @@

final class HdfsBlobContainer extends AbstractBlobContainer {
private final HdfsBlobStore store;
private final HdfsSecurityContext securityContext;
private final Path path;
private final int bufferSize;

HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore store, Path path, int bufferSize) {
HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore store, Path path, int bufferSize, HdfsSecurityContext hdfsSecurityContext) {
super(blobPath);
this.store = store;
this.securityContext = hdfsSecurityContext;
this.path = path;
this.bufferSize = bufferSize;
}
Expand Down Expand Up @@ -90,7 +93,9 @@ public InputStream readBlob(String blobName) throws IOException {
// FSDataInputStream can open connections on read() or skip() so we wrap in
// HDFSPrivilegedInputSteam which will ensure that underlying methods will
// be called with the proper privileges.
return store.execute(fileContext -> new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize)));
return store.execute(fileContext ->
new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize), securityContext)
);
}

@Override
Expand Down Expand Up @@ -144,8 +149,11 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
*/
private static class HDFSPrivilegedInputSteam extends FilterInputStream {

HDFSPrivilegedInputSteam(InputStream in) {
private final HdfsSecurityContext securityContext;

HDFSPrivilegedInputSteam(InputStream in, HdfsSecurityContext hdfsSecurityContext) {
super(in);
this.securityContext = hdfsSecurityContext;
}

public int read() throws IOException {
Expand Down Expand Up @@ -175,9 +183,10 @@ public synchronized void reset() throws IOException {
});
}

private static <T> T doPrivilegedOrThrow(PrivilegedExceptionAction<T> action) throws IOException {
private <T> T doPrivilegedOrThrow(PrivilegedExceptionAction<T> action) throws IOException {
SpecialPermission.check();
try {
return AccessController.doPrivileged(action);
return AccessController.doPrivileged(action, null, securityContext.getRestrictedExecutionPermissions());
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public String toString() {

@Override
public BlobContainer blobContainer(BlobPath path) {
return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize);
return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize, securityContext);
}

private Path buildHdfsPath(BlobPath blobPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private FileContext createContext(URI uri, Settings repositorySettings) {
hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true);

// Create the filecontext with our user information
// This will correctly configure the filecontext to have our UGI as it's internal user.
// This will correctly configure the filecontext to have our UGI as its internal user.
return ugi.doAs((PrivilegedAction<FileContext>) () -> {
try {
AbstractFileSystem fs = AbstractFileSystem.get(uri, hadoopConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class HdfsSecurityContext {
// 1) hadoop dynamic proxy is messy with access rules
new ReflectPermission("suppressAccessChecks"),
// 2) allow hadoop to add credentials to our Subject
new AuthPermission("modifyPrivateCredentials")
new AuthPermission("modifyPrivateCredentials"),
// 3) RPC Engine requires this for re-establishing pooled connections over the lifetime of the client
new PrivateCredentialPermission("org.apache.hadoop.security.Credentials * \"*\"", "read")
};

// If Security is enabled, we need all the following elevated permissions:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Integration tests for HDFS Repository plugin
#
# Tests retrieving information about snapshot
#
---
"Get a snapshot - readonly":
# Create repository
- do:
snapshot.create_repository:
repository: test_snapshot_repository_ro
body:
type: hdfs
settings:
uri: "hdfs://localhost:9999"
path: "/user/elasticsearch/existing/readonly-repository"
readonly: true

# List snapshot info
- do:
snapshot.get:
repository: test_snapshot_repository_ro
snapshot: "_all"

- length: { snapshots: 1 }

# Remove our repository
- do:
snapshot.delete_repository:
repository: test_snapshot_repository_ro
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Integration tests for HDFS Repository plugin
#
# Tests retrieving information about snapshot
#
---
"Get a snapshot - readonly":
# Create repository
- do:
snapshot.create_repository:
repository: test_snapshot_repository_ro
body:
type: hdfs
settings:
uri: "hdfs://localhost:9998"
path: "/user/elasticsearch/existing/readonly-repository"
security:
principal: "elasticsearch@BUILD.ELASTIC.CO"
readonly: true

# List snapshot info
- do:
snapshot.get:
repository: test_snapshot_repository_ro
snapshot: "_all"

- length: { snapshots: 1 }

# Remove our repository
- do:
snapshot.delete_repository:
repository: test_snapshot_repository_ro
40 changes: 32 additions & 8 deletions test/fixtures/hdfs-fixture/src/main/java/hdfs/MiniHDFS.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package hdfs;

import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -29,9 +31,11 @@
import java.util.Arrays;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -100,15 +104,35 @@ public static void main(String[] args) throws Exception {
}
MiniDFSCluster dfs = builder.build();

// Set the elasticsearch user directory up
if (UserGroupInformation.isSecurityEnabled()) {
FileSystem fs = dfs.getFileSystem();
org.apache.hadoop.fs.Path esUserPath = new org.apache.hadoop.fs.Path("/user/elasticsearch");
// Configure contents of the filesystem
org.apache.hadoop.fs.Path esUserPath = new org.apache.hadoop.fs.Path("/user/elasticsearch");
try (FileSystem fs = dfs.getFileSystem()) {

// Set the elasticsearch user directory up
fs.mkdirs(esUserPath);
List<AclEntry> acls = new ArrayList<>();
acls.add(new AclEntry.Builder().setType(AclEntryType.USER).setName("elasticsearch").setPermission(FsAction.ALL).build());
fs.modifyAclEntries(esUserPath, acls);
fs.close();
if (UserGroupInformation.isSecurityEnabled()) {
List<AclEntry> acls = new ArrayList<>();
acls.add(new AclEntry.Builder().setType(AclEntryType.USER).setName("elasticsearch").setPermission(FsAction.ALL).build());
fs.modifyAclEntries(esUserPath, acls);
}

// Install a pre-existing repository into HDFS
String directoryName = "readonly-repository";
String archiveName = directoryName + ".tar.gz";
URL readOnlyRepositoryArchiveURL = MiniHDFS.class.getClassLoader().getResource(archiveName);
if (readOnlyRepositoryArchiveURL != null) {
Path tempDirectory = Files.createTempDirectory(MiniHDFS.class.getName());
File readOnlyRepositoryArchive = tempDirectory.resolve(archiveName).toFile();
FileUtils.copyURLToFile(readOnlyRepositoryArchiveURL, readOnlyRepositoryArchive);
FileUtil.unTar(readOnlyRepositoryArchive, tempDirectory.toFile());

fs.copyFromLocalFile(true, true,
new org.apache.hadoop.fs.Path(tempDirectory.resolve(directoryName).toAbsolutePath().toUri()),
esUserPath.suffix("/existing/" + directoryName)
);

FileUtils.deleteDirectory(tempDirectory.toFile());
}
}

// write our PID file
Expand Down
Binary file not shown.

0 comments on commit cab0023

Please sign in to comment.