Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDPD-12385 Add Hadoop File System extended attributes support. #9

Open
wants to merge 1 commit into
base: HDP-3.1.5.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gcs/CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Backport Add Hadoop File System extended attributes support.
1.9.10 - 2018-11-01

1. Use Hadoop `CredentialProvider` API to retrieve proxy credentials.
Expand Down
13 changes: 13 additions & 0 deletions gcs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
Expand All @@ -243,6 +244,18 @@
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/${hadoop.identifier}/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.cloud.hadoop.fs.gcs;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem;
Expand All @@ -26,6 +27,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

Expand Down Expand Up @@ -81,4 +83,17 @@ public FSDataOutputStream createNonRecursive(
blockSize,
progress);
}

/** {@inheritDoc} */
@Override
public void setXAttr(Path path, String name, byte[] value, EnumSet<XAttrSetFlag> flags)
throws IOException {
checkArgument(flags != null && !flags.isEmpty(), "flags should not be null or empty");
boolean create = flags.contains(XAttrSetFlag.CREATE);
boolean replace = flags.contains(XAttrSetFlag.REPLACE);
setXAttrInternal(path, name, value, create, replace);
}

abstract void setXAttrInternal(
Path path, String name, byte[] value, boolean create, boolean replace) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.cloud.hadoop.fs.gcs;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem;
Expand All @@ -26,6 +27,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

Expand Down Expand Up @@ -81,4 +83,17 @@ public FSDataOutputStream createNonRecursive(
blockSize,
progress);
}

/** {@inheritDoc} */
@Override
public void setXAttr(Path path, String name, byte[] value, EnumSet<XAttrSetFlag> flags)
throws IOException {
checkArgument(flags != null && !flags.isEmpty(), "flags should not be null or empty");
boolean create = flags.contains(XAttrSetFlag.CREATE);
boolean replace = flags.contains(XAttrSetFlag.REPLACE);
setXAttrInternal(path, name, value, create, replace);
}

abstract void setXAttrInternal(
Path path, String name, byte[] value, boolean create, boolean replace) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_WORKING_DIRECTORY;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.PATH_CODEC;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.PERMISSIONS_TO_REPORT;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.flogger.LazyArgs.lazy;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.api.client.auth.oauth2.Credential;
import com.google.cloud.hadoop.gcsio.CreateFileOptions;
Expand All @@ -44,6 +46,7 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.GenerationReadConsistency;
import com.google.cloud.hadoop.gcsio.PathCodec;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.cloud.hadoop.gcsio.UpdatableItemInfo;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.AccessTokenProviderClassFromConfigFactory;
import com.google.cloud.hadoop.util.CredentialFactory;
Expand All @@ -55,11 +58,14 @@
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.FileInputStream;
Expand All @@ -76,14 +82,20 @@
import java.util.Comparator;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
Expand Down Expand Up @@ -208,6 +220,13 @@ public int getByteLength() {
*/
private boolean enableAutoRepairImplicitDirectories =
GCS_REPAIR_IMPLICIT_DIRECTORIES_ENABLE.getDefault();
private static final String XATTR_KEY_PREFIX = "GHFS_XATTR_";

// Use empty array as null value because GCS API already uses null value to remove metadata key
private static final byte[] XATTR_NULL_VALUE = new byte[0];

private static final ThreadFactory DAEMON_THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("ghfs-thread-%d").setDaemon(true).build();

@VisibleForTesting
boolean enableFlatGlob = GCS_FLAT_GLOB_ENABLE.getDefault();
Expand Down Expand Up @@ -1669,7 +1688,7 @@ public void configureBuckets(String systemBucketName, boolean createSystemBucket
}
}

logger.atFine().log("GHFS.configureBuckets:=>");
logger.atFine().log("GHFS.configureBuckets:=> ");
}

/**
Expand Down Expand Up @@ -1873,6 +1892,141 @@ public void setTimes(Path p, long mtime, long atime)
logger.atFine().log("GHFS.setTimes:=> ");
}

/** Supported starting from Hadoop 2.x */
public byte[] getXAttr(Path path, String name) throws IOException {
logger.atFine().log("GHFS.getXAttr: %s, %s", path, name);
checkNotNull(path, "path should not be null");
checkNotNull(name, "name should not be null");

Map<String, byte[]> attributes = getGcsFs().getFileInfo(getGcsPath(path)).getAttributes();
String xAttrKey = getXAttrKey(name);
byte[] xAttr =
attributes.containsKey(xAttrKey) ? getXAttrValue(attributes.get(xAttrKey)) : null;

logger.atFine().log("GHFS.getXAttr:=> %s", lazy(() -> new String(xAttr, UTF_8)));
return xAttr;
}

/** Supported starting from Hadoop 2.x */
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
logger.atFine().log("GHFS.getXAttrs: %s", path);
checkNotNull(path, "path should not be null");

FileInfo fileInfo = getGcsFs().getFileInfo(getGcsPath(path));
Map<String, byte[]> xAttrs =
fileInfo.getAttributes().entrySet().stream()
.filter(a -> isXAttr(a.getKey()))
.collect(
HashMap::new,
(m, a) -> m.put(getXAttrName(a.getKey()), getXAttrValue(a.getValue())),
Map::putAll);

logger.atFine().log("GHFS.getXAttrs:=> %s", xAttrs);
return xAttrs;
}

/** Supported starting from Hadoop 2.x */
public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOException {
logger.atFine().log("GHFS.getXAttrs: %s, %s", path, names);
checkNotNull(path, "path should not be null");
checkNotNull(names, "names should not be null");

Map<String, byte[]> xAttrs;
if (names.isEmpty()) {
xAttrs = new HashMap<>();
} else {
Set<String> namesSet = new HashSet<>(names);
xAttrs =
getXAttrs(path).entrySet().stream()
.filter(a -> namesSet.contains(a.getKey()))
.collect(HashMap::new, (m, a) -> m.put(a.getKey(), a.getValue()), Map::putAll);
}

logger.atFine().log("GHFS.getXAttrs:=> %s", xAttrs);
return xAttrs;
}

/** Supported starting from Hadoop 2.x */
public List<String> listXAttrs(Path path) throws IOException {
logger.atFine().log("GHFS.listXAttrs: %s", path);
checkNotNull(path, "path should not be null");

FileInfo fileInfo = getGcsFs().getFileInfo(getGcsPath(path));

List<String> xAttrs =
fileInfo.getAttributes().keySet().stream()
.filter(this::isXAttr)
.map(this::getXAttrName)
.collect(Collectors.toCollection(ArrayList::new));

logger.atFine().log("GHFS.listXAttrs:=> %s", xAttrs);
return xAttrs;
}

void setXAttrInternal(Path path, String name, byte[] value, boolean create, boolean replace)
throws IOException {
logger.atFine().log(
"GHFS.setXAttr: %s, %s, %s, %s, %s",
path, name, lazy(() -> new String(value, UTF_8)), create, replace);
checkNotNull(path, "path should not be null");
checkNotNull(name, "name should not be null");

FileInfo fileInfo = getGcsFs().getFileInfo(getGcsPath(path));
String xAttrKey = getXAttrKey(name);
Map<String, byte[]> attributes = fileInfo.getAttributes();

if (attributes.containsKey(xAttrKey) && !replace) {
throw new IOException(
String.format(
"REPLACE flag must be set to update XAttr (name='%s', value='%s') for '%s'",
name, new String(value, UTF_8), path));
}
if (!attributes.containsKey(xAttrKey) && !create) {
throw new IOException(
String.format(
"CREATE flag must be set to create XAttr (name='%s', value='%s') for '%s'",
name, new String(value, UTF_8), path));
}

UpdatableItemInfo updateInfo =
new UpdatableItemInfo(
fileInfo.getItemInfo().getResourceId(),
ImmutableMap.of(xAttrKey, getXAttrValue(value)));
getGcsFs().getGcs().updateItems(ImmutableList.of(updateInfo));
logger.atFine().log("GHFS.setXAttr:=> ");
}

/** Supported starting from Hadoop 2.x */
public void removeXAttr(Path path, String name) throws IOException {
logger.atFine().log("GHFS.removeXAttr: %s, %s", path, name);
checkNotNull(path, "path should not be null");
checkNotNull(name, "name should not be null");

FileInfo fileInfo = getGcsFs().getFileInfo(getGcsPath(path));
Map<String, byte[]> xAttrToRemove = new HashMap<>();
xAttrToRemove.put(getXAttrKey(name), null);
UpdatableItemInfo updateInfo =
new UpdatableItemInfo(fileInfo.getItemInfo().getResourceId(), xAttrToRemove);
getGcsFs().getGcs().updateItems(ImmutableList.of(updateInfo));
logger.atFine().log("GHFS.removeXAttr:=> ");
}

private boolean isXAttr(String key) {
return key != null && key.startsWith(XATTR_KEY_PREFIX);
}

private String getXAttrKey(String name) {
return XATTR_KEY_PREFIX + name;
}

private String getXAttrName(String key) {
return key.substring(XATTR_KEY_PREFIX.length());
}

private byte[] getXAttrValue(byte[] value) {
return value == null ? XATTR_NULL_VALUE : value;
}

/** @deprecated use {@link GoogleHadoopFileSystemConfiguration#PERMISSIONS_TO_REPORT} */
@Deprecated
public static final String PERMISSIONS_TO_REPORT_KEY =
Expand Down
Loading