e) {
+ return size() > MAX_PROVIDERS;
+ }
+ };
+
+ private AccessTokenProvider instance = null;
+
+ /**
+ * Create handle for cached instance.
+ */
+ public CachedRefreshTokenBasedAccessTokenProvider() {
+ }
+
+ /**
+ * Gets the access token from internally cached
+ * ConfRefreshTokenBasedAccessTokenProvider instance.
+ *
+ * @return Valid OAuth2 access token for the user.
+ * @throws IOException when system error, internal server error or user error
+ */
+ @Override
+ public synchronized String getAccessToken() throws IOException {
+ return instance.getAccessToken();
+ }
+
+ /**
+ * @return A cached Configuration consistent with the parameters of this
+ * instance.
+ */
+ @Override
+ public synchronized Configuration getConf() {
+ return instance.getConf();
+ }
+
+ /**
+ * Configure cached instance. Note that the Configuration instance returned
+ * from subsequent calls to {@link #getConf() getConf} may be from a
+ * previous, cached entry.
+ * @param conf Configuration instance
+ */
+ @Override
+ public synchronized void setConf(Configuration conf) {
+ String id = conf.get(OAUTH_CLIENT_ID_KEY);
+ if (null == id) {
+ throw new IllegalArgumentException("Missing client ID");
+ }
+ synchronized (CACHE) {
+ instance = CACHE.get(id);
+ if (null == instance
+ || conf.getBoolean(FORCE_REFRESH, false)
+ || replace(instance, conf)) {
+ instance = newInstance();
+ // clone configuration
+ instance.setConf(new Configuration(conf));
+ CACHE.put(id, instance);
+ LOG.debug("Created new client {}", id);
+ }
+ }
+ }
+
+ AccessTokenProvider newInstance() {
+ return new ConfRefreshTokenBasedAccessTokenProvider();
+ }
+
+ private static boolean replace(AccessTokenProvider cached, Configuration c2) {
+ // ConfRefreshTokenBasedAccessTokenProvider::setConf asserts !null
+ final Configuration c1 = cached.getConf();
+ for (String key : new String[] {
+ OAUTH_REFRESH_TOKEN_KEY, OAUTH_REFRESH_URL_KEY }) {
+ if (!c1.get(key).equals(c2.get(key))) {
+ // replace cached instance for this clientID
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
new file mode 100644
index 0000000000..b444984dfd
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * public interface to expose OAuth2 authentication related features.
+ */
+package org.apache.hadoop.fs.adl.oauth2;
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
new file mode 100644
index 0000000000..98e6a776c5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Supporting classes for metrics instrumentation.
+ */
+package org.apache.hadoop.fs.adl;
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
new file mode 100644
index 0000000000..a7f932f5c1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+/**
+ * Constants.
+ */
+public final class ADLConfKeys {
+ public static final String
+ ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN =
+ "adl.feature.override.readahead.max.concurrent.connection";
+ public static final int
+ ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT = 2;
+ public static final String ADL_WEBSDK_VERSION_KEY = "ADLFeatureSet";
+ static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER =
+ "adl.debug.override.localuserasfileowner";
+ static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false;
+ static final String ADL_FEATURE_REDIRECT_OFF =
+ "adl.feature.override.redirection.off";
+ static final boolean ADL_FEATURE_REDIRECT_OFF_DEFAULT = true;
+ static final String ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED =
+ "adl.feature.override.getblocklocation.locally.bundled";
+ static final boolean ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT
+ = true;
+ static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD =
+ "adl.feature.override.readahead";
+ static final boolean ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT =
+ true;
+ static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE =
+ "adl.feature.override.readahead.max.buffersize";
+
+ static final int KB = 1024;
+ static final int MB = KB * KB;
+ static final int DEFAULT_BLOCK_SIZE = 4 * MB;
+ static final int DEFAULT_EXTENT_SIZE = 256 * MB;
+ static final int DEFAULT_TIMEOUT_IN_SECONDS = 120;
+ static final int
+ ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT =
+ 8 * MB;
+
+ private ADLConfKeys() {
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
new file mode 100644
index 0000000000..350c6e78ea
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+/**
+ * Responsible for holding buffered data in the process. Hold only 1 and only
+ * 1 buffer block in the memory. Buffer block
+ * information is for the given file and the offset from the which the block
+ * is fetched. Across the webhdfs instances if
+ * same buffer block has been used then backend trip is avoided. Buffer block
+ * is certainly important since ADL fetches
+ * large amount of data (Default is 4MB however can be configured through
+ * core-site.xml) from the backend.
+ * Observation is in case of ORC/Avro kind of compressed file, buffer block
+ * does not avoid few backend calls across
+ * webhdfs
+ * instances.
+ */
+final class BufferManager {
+ private static final BufferManager BUFFER_MANAGER_INSTANCE = new
+ BufferManager();
+ private static Object lock = new Object();
+ private Buffer buffer = null;
+ private String fileName;
+
+ /**
+ * Constructor.
+ */
+ private BufferManager() {
+ }
+
+ public static Object getLock() {
+ return lock;
+ }
+
+ public static BufferManager getInstance() {
+ return BUFFER_MANAGER_INSTANCE;
+ }
+
+ /**
+ * Validate if the current buffer block is of given stream.
+ *
+ * @param path ADL stream path
+ * @param offset Stream offset that caller is interested in
+ * @return True if the buffer block is available otherwise false
+ */
+ boolean hasValidDataForOffset(String path, long offset) {
+ if (this.fileName == null) {
+ return false;
+ }
+
+ if (!this.fileName.equals(path)) {
+ return false;
+ }
+
+ if (buffer == null) {
+ return false;
+ }
+
+ if ((offset < buffer.offset) || (offset >= (buffer.offset
+ + buffer.data.length))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Clean buffer block.
+ */
+ void clear() {
+ buffer = null;
+ }
+
+ /**
+ * Validate if the current buffer block is of given stream. For now partial
+ * data available is not supported.
+ * Data must be available exactly or within the range of offset and size
+ * passed as parameter.
+ *
+ * @param path Stream path
+ * @param offset Offset of the stream
+ * @param size Size of the data from the offset of the stream caller
+ * interested in
+ * @return True if the data is available from the given offset and of the
+ * size caller is interested in.
+ */
+ boolean hasData(String path, long offset, int size) {
+
+ if (!hasValidDataForOffset(path, offset)) {
+ return false;
+ }
+
+ if ((size + offset) > (buffer.data.length + buffer.offset)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Return the buffer block from the requested offset. It is caller
+ * responsibility to check if the buffer block is
+ * of there interest and offset is valid.
+ *
+ * @param data Byte array to be filed from the buffer block
+ * @param offset Data to be fetched from the offset.
+ */
+ void get(byte[] data, long offset) {
+ System.arraycopy(buffer.data, (int) (offset - buffer.offset), data, 0,
+ data.length);
+ }
+
+ /**
+ * Create new empty buffer block of the given size.
+ *
+ * @param len Size of the buffer block.
+ * @return Empty byte array.
+ */
+ byte[] getEmpty(int len) {
+ return new byte[len];
+ }
+
+ /**
+ * This function allows caller to specify new buffer block for the stream
+ * which is pulled from the backend.
+ *
+ * @param data Buffer
+ * @param path Stream path to which buffer belongs to
+ * @param offset Stream offset where buffer start with
+ */
+ void add(byte[] data, String path, long offset) {
+ if (data == null) {
+ return;
+ }
+
+ buffer = new Buffer();
+ buffer.data = data;
+ buffer.offset = offset;
+ this.fileName = path;
+ }
+
+ /**
+ * @return Size of the buffer.
+ */
+ int getBufferSize() {
+ return buffer.data.length;
+ }
+
+ /**
+ * @return Stream offset where buffer start with
+ */
+ long getBufferOffset() {
+ return buffer.offset;
+ }
+
+ /**
+ * Buffer container.
+ */
+ static class Buffer {
+ private byte[] data;
+ private long offset;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
new file mode 100644
index 0000000000..89011d286a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
@@ -0,0 +1,1108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.web.resources.ADLFlush;
+import org.apache.hadoop.hdfs.web.resources.ADLGetOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLPostOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLPutOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLVersionInfo;
+import org.apache.hadoop.hdfs.web.resources.AppendADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
+import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.CreateADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
+import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
+import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
+import org.apache.hadoop.hdfs.web.resources.LeaseParam;
+import org.apache.hadoop.hdfs.web.resources.LengthParam;
+import org.apache.hadoop.hdfs.web.resources.OffsetParam;
+import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
+import org.apache.hadoop.hdfs.web.resources.Param;
+import org.apache.hadoop.hdfs.web.resources.PermissionParam;
+import org.apache.hadoop.hdfs.web.resources.PutOpParam;
+import org.apache.hadoop.hdfs.web.resources.ReadADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.VersionInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extended @see SWebHdfsFileSystem API. This class contains Azure data lake
+ * specific stability, Reliability and performance improvement.
+ *
+ * Motivation behind PrivateAzureDataLakeFileSystem to encapsulate dependent
+ * implementation on org.apache.hadoop.hdfs.web package to configure query
+ * parameters, configuration over HTTP request send to backend .. etc. This
+ * class should be refactored and moved under package org.apache.hadoop.fs
+ * .adl once the required dependent changes are made into ASF code.
+ */
+public class PrivateAzureDataLakeFileSystem extends SWebHdfsFileSystem {
+
+ public static final String SCHEME = "adl";
+
+ // Feature configuration
+ private boolean featureGetBlockLocationLocallyBundled = true;
+ private boolean featureConcurrentReadWithReadAhead = true;
+ private boolean featureRedirectOff = true;
+ private boolean featureFlushWhenEOF = true;
+ private boolean overrideOwner = false;
+ private int maxConcurrentConnection;
+ private int maxBufferSize;
+ private String userName;
+
+ /**
+ * Constructor.
+ */
+ public PrivateAzureDataLakeFileSystem() {
+ try {
+ userName = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ userName = "hadoop";
+ }
+ }
+
+ @Override
+ public synchronized void initialize(URI uri, Configuration conf)
+ throws IOException {
+ super.initialize(uri, conf);
+ overrideOwner = getConf()
+ .getBoolean(ADLConfKeys.ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
+ ADLConfKeys.ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
+
+ featureRedirectOff = getConf()
+ .getBoolean(ADLConfKeys.ADL_FEATURE_REDIRECT_OFF,
+ ADLConfKeys.ADL_FEATURE_REDIRECT_OFF_DEFAULT);
+
+ featureGetBlockLocationLocallyBundled = getConf()
+ .getBoolean(ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED,
+ ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT);
+
+ featureConcurrentReadWithReadAhead = getConf().
+ getBoolean(ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD,
+ ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT);
+
+ maxBufferSize = getConf().getInt(
+ ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE,
+ ADLConfKeys
+ .ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT);
+
+ maxConcurrentConnection = getConf().getInt(
+ ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN,
+ ADLConfKeys
+ .ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT);
+ }
+
+ @VisibleForTesting
+ protected boolean isFeatureGetBlockLocationLocallyBundled() {
+ return featureGetBlockLocationLocallyBundled;
+ }
+
+ @VisibleForTesting
+ protected boolean isFeatureConcurrentReadWithReadAhead() {
+ return featureConcurrentReadWithReadAhead;
+ }
+
+ @VisibleForTesting
+ protected boolean isFeatureRedirectOff() {
+ return featureRedirectOff;
+ }
+
+ @VisibleForTesting
+ protected boolean isOverrideOwnerFeatureOn() {
+ return overrideOwner;
+ }
+
+ @VisibleForTesting
+ protected int getMaxBufferSize() {
+ return maxBufferSize;
+ }
+
+ @VisibleForTesting
+ protected int getMaxConcurrentConnection() {
+ return maxConcurrentConnection;
+ }
+
+ @Override
+ public String getScheme() {
+ return SCHEME;
+ }
+
+ /**
+ * Constructing home directory locally is fine as long as Hadoop
+ * local user name and ADL user name relationship story is not fully baked
+ * yet.
+ *
+ * @return Hadoop local user home directory.
+ */
+ @Override
+ public final Path getHomeDirectory() {
+ try {
+ return makeQualified(new Path(
+ "/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
+ } catch (IOException e) {
+ }
+
+ return new Path("/user/" + userName);
+ }
+
+ /**
+ * Azure data lake does not support user configuration for data replication
+ * hence not leaving system to query on
+ * azure data lake.
+ *
+ * Stub implementation
+ *
+ * @param p Not honoured
+ * @param replication Not honoured
+ * @return True hard coded since ADL file system does not support
+ * replication configuration
+ * @throws IOException No exception would not thrown in this case however
+ * aligning with parent api definition.
+ */
+ @Override
+ public final boolean setReplication(final Path p, final short replication)
+ throws IOException {
+ return true;
+ }
+
+ /**
+ * @param f File/Folder path
+ * @return FileStatus instance containing metadata information of f
+ * @throws IOException For any system error
+ */
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ FileStatus status = super.getFileStatus(f);
+
+ if (overrideOwner) {
+ FileStatus proxiedStatus = new FileStatus(status.getLen(),
+ status.isDirectory(), status.getReplication(), status.getBlockSize(),
+ status.getModificationTime(), status.getAccessTime(),
+ status.getPermission(), userName, "hdfs", status.getPath());
+ return proxiedStatus;
+ } else {
+ return status;
+ }
+ }
+
+ /**
+ * Create call semantic is handled differently in case of ADL. Create
+ * semantics is translated to Create/Append
+ * semantics.
+ * 1. No dedicated connection to server.
+ * 2. Buffering is locally done, Once buffer is full or flush is invoked on
+ * the by the caller. All the pending
+ * data is pushed to ADL as APPEND operation code.
+ * 3. On close - Additional call is send to server to close the stream, and
+ * release lock from the stream.
+ *
+ * Necessity of Create/Append semantics is
+ * 1. ADL backend server does not allow idle connection for longer duration
+ * . In case of slow writer scenario,
+ * observed connection timeout/Connection reset causing occasional job
+ * failures.
+ * 2. Performance boost to jobs which are slow writer, avoided network latency
+ * 3. ADL equally better performing with multiple of 4MB chunk as append
+ * calls.
+ *
+ * @param f File path
+ * @param permission Access permission for the newly created file
+ * @param overwrite Remove existing file and recreate new one if true
+ * otherwise throw error if file exist
+ * @param bufferSize Buffer size, ADL backend does not honour
+ * @param replication Replication count, ADL backend does not honour
+ * @param blockSize Block size, ADL backend does not honour
+ * @param progress Progress indicator
+ * @return FSDataOutputStream OutputStream on which application can push
+ * stream of bytes
+ * @throws IOException when system error, internal server error or user error
+ */
+ @Override
+ public FSDataOutputStream create(final Path f, final FsPermission permission,
+ final boolean overwrite, final int bufferSize, final short replication,
+ final long blockSize, final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
+ new PermissionParam(applyUMask(permission)),
+ new OverwriteParam(overwrite), new BufferSizeParam(bufferSize),
+ new ReplicationParam(replication), new BlockSizeParam(blockSize),
+ new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
+ };
+ }
+
+ @Override
+ public FSDataOutputStream createNonRecursive(final Path f,
+ final FsPermission permission, final EnumSet flag,
+ final int bufferSize, final short replication, final long blockSize,
+ final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ String leaseId = java.util.UUID.randomUUID().toString();
+ return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
+ new PermissionParam(applyUMask(permission)), new CreateFlagParam(flag),
+ new CreateParentParam(false), new BufferSizeParam(bufferSize),
+ new ReplicationParam(replication), new LeaseParam(leaseId),
+ new BlockSizeParam(blockSize),
+ new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
+ };
+ }
+
+ /**
+ * Since defined as private in parent class, redefined to pass through
+ * Create api implementation.
+ *
+ * @param permission
+ * @return FsPermission list
+ */
+ private FsPermission applyUMask(FsPermission permission) {
+ FsPermission fsPermission = permission;
+ if (fsPermission == null) {
+ fsPermission = FsPermission.getDefault();
+ }
+ return fsPermission.applyUMask(FsPermission.getUMask(getConf()));
+ }
+
+ /**
+ * Open call semantic is handled differently in case of ADL. Instead of
+ * network stream is returned to the user,
+ * Overridden FsInputStream is returned.
+ *
+ * 1. No dedicated connection to server.
+ * 2. Process level concurrent read ahead Buffering is done, This allows
+ * data to be available for caller quickly.
+ * 3. Number of byte to read ahead is configurable.
+ *
+ * Advantage of Process level concurrent read ahead Buffering semantics is
+ * 1. ADL backend server does not allow idle connection for longer duration
+ * . In case of slow reader scenario,
+ * observed connection timeout/Connection reset causing occasional job
+ * failures.
+ * 2. Performance boost to jobs which are slow reader, avoided network latency
+ * 3. Compressed format support like ORC, and large data files gains the
+ * most out of this implementation.
+ *
+ * Read ahead feature is configurable.
+ *
+ * @param f File path
+ * @param buffersize Buffer size
+ * @return FSDataInputStream InputStream on which application can read
+ * stream of bytes
+ * @throws IOException when system error, internal server error or user error
+ */
+ @Override
+ public FSDataInputStream open(final Path f, final int buffersize)
+ throws IOException {
+ statistics.incrementReadOps(1);
+
+ final HttpOpParam.Op op = GetOpParam.Op.OPEN;
+ // use a runner so the open can recover from an invalid token
+ FsPathConnectionRunner runner = null;
+
+ if (featureConcurrentReadWithReadAhead) {
+ URL url = this.toUrl(op, f, new BufferSizeParam(buffersize),
+ new ReadADLNoRedirectParam(true),
+ new ADLVersionInfo(VersionInfo.getVersion()));
+
+ BatchByteArrayInputStream bb = new BatchByteArrayInputStream(url, f,
+ maxBufferSize, maxConcurrentConnection);
+
+ FSDataInputStream fin = new FSDataInputStream(bb);
+ return fin;
+ } else {
+ if (featureRedirectOff) {
+ runner = new FsPathConnectionRunner(ADLGetOpParam.Op.OPEN, f,
+ new BufferSizeParam(buffersize), new ReadADLNoRedirectParam(true),
+ new ADLVersionInfo(VersionInfo.getVersion()));
+ } else {
+ runner = new FsPathConnectionRunner(op, f,
+ new BufferSizeParam(buffersize));
+ }
+
+ return new FSDataInputStream(
+ new OffsetUrlInputStream(new UnresolvedUrlOpener(runner),
+ new OffsetUrlOpener(null)));
+ }
+ }
+
+ /**
+ * @param f File/Folder path
+ * @return FileStatus array list
+ * @throws IOException For system error
+ */
+ @Override
+ public FileStatus[] listStatus(final Path f) throws IOException {
+ FileStatus[] fileStatuses = super.listStatus(f);
+ for (int i = 0; i < fileStatuses.length; i++) {
+ if (overrideOwner) {
+ fileStatuses[i] = new FileStatus(fileStatuses[i].getLen(),
+ fileStatuses[i].isDirectory(), fileStatuses[i].getReplication(),
+ fileStatuses[i].getBlockSize(),
+ fileStatuses[i].getModificationTime(),
+ fileStatuses[i].getAccessTime(), fileStatuses[i].getPermission(),
+ userName, "hdfs", fileStatuses[i].getPath());
+ }
+ }
+ return fileStatuses;
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(final FileStatus status,
+ final long offset, final long length) throws IOException {
+ if (status == null) {
+ return null;
+ }
+
+ if (featureGetBlockLocationLocallyBundled) {
+ if ((offset < 0) || (length < 0)) {
+ throw new IllegalArgumentException("Invalid start or len parameter");
+ }
+
+ if (status.getLen() < offset) {
+ return new BlockLocation[0];
+ }
+
+ final String[] name = {"localhost"};
+ final String[] host = {"localhost"};
+ long blockSize = ADLConfKeys.DEFAULT_EXTENT_SIZE; // Block size must be
+ // non zero
+ int numberOfLocations =
+ (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
+ BlockLocation[] locations = new BlockLocation[numberOfLocations];
+ for (int i = 0; i < locations.length; i++) {
+ long currentOffset = offset + (i * blockSize);
+ long currentLength = Math
+ .min(blockSize, offset + length - currentOffset);
+ locations[i] = new BlockLocation(name, host, currentOffset,
+ currentLength);
+ }
+
+ return locations;
+ } else {
+ return getFileBlockLocations(status.getPath(), offset, length);
+ }
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
+ final long length) throws IOException {
+ statistics.incrementReadOps(1);
+
+ if (featureGetBlockLocationLocallyBundled) {
+ FileStatus fileStatus = getFileStatus(p);
+ return getFileBlockLocations(fileStatus, offset, length);
+ } else {
+ return super.getFileBlockLocations(p, offset, length);
+ }
+ }
+
+ enum StreamState {
+ Initial,
+ DataCachedInLocalBuffer,
+ StreamEnd
+ }
+
+ class BatchAppendOutputStream extends OutputStream {
+ private Path fsPath;
+ private Param, ?>[] parameters;
+ private byte[] data = null;
+ private int offset = 0;
+ private long length = 0;
+ private boolean eof = false;
+ private boolean hadError = false;
+ private byte[] dataBuffers = null;
+ private int bufSize = 0;
+ private boolean streamClosed = false;
+
+ public BatchAppendOutputStream(Path path, int bufferSize,
+ Param, ?>... param) throws IOException {
+ if (bufferSize < (ADLConfKeys.DEFAULT_BLOCK_SIZE)) {
+ bufSize = ADLConfKeys.DEFAULT_BLOCK_SIZE;
+ } else {
+ bufSize = bufferSize;
+ }
+
+ this.fsPath = path;
+ this.parameters = param;
+ this.data = getBuffer();
+ FSDataOutputStream createStream = null;
+ try {
+ if (featureRedirectOff) {
+ CreateADLNoRedirectParam skipRedirect = new CreateADLNoRedirectParam(
+ true);
+ Param, ?>[] tmpParam = featureFlushWhenEOF ?
+ new Param, ?>[param.length + 2] :
+ new Param, ?>[param.length + 1];
+ System.arraycopy(param, 0, tmpParam, 0, param.length);
+ tmpParam[param.length] = skipRedirect;
+ if (featureFlushWhenEOF) {
+ tmpParam[param.length + 1] = new ADLFlush(false);
+ }
+ createStream = new FsPathOutputStreamRunner(ADLPutOpParam.Op.CREATE,
+ fsPath, 1, tmpParam).run();
+ } else {
+ createStream = new FsPathOutputStreamRunner(PutOpParam.Op.CREATE,
+ fsPath, 1, param).run();
+ }
+ } finally {
+ if (createStream != null) {
+ createStream.close();
+ }
+ }
+ }
+
+ @Override
+ public final synchronized void write(int b) throws IOException {
+ if (streamClosed) {
+ throw new IOException(fsPath + " stream object is closed.");
+ }
+
+ if (offset == (data.length)) {
+ flush();
+ }
+
+ data[offset] = (byte) b;
+ offset++;
+
+ // Statistics will get incremented again as part of the batch updates,
+ // decrement here to avoid double value
+ if (statistics != null) {
+ statistics.incrementBytesWritten(-1);
+ }
+ }
+
+ @Override
+ public final synchronized void write(byte[] buf, int off, int len)
+ throws IOException {
+ if (streamClosed) {
+ throw new IOException(fsPath + " stream object is closed.");
+ }
+
+ int bytesToWrite = len;
+ int localOff = off;
+ int localLen = len;
+ if (localLen >= data.length) {
+ // Flush data that is already in our internal buffer
+ flush();
+
+ // Keep committing data until we have less than our internal buffers
+ // length left
+ do {
+ try {
+ commit(buf, localOff, data.length, eof);
+ } catch (IOException e) {
+ hadError = true;
+ throw e;
+ }
+ localOff += data.length;
+ localLen -= data.length;
+ } while (localLen >= data.length);
+ }
+
+ // At this point, we have less than data.length left to copy from users
+ // buffer
+ if (offset + localLen >= data.length) {
+ // Users buffer has enough data left to fill our internal buffer
+ int bytesToCopy = data.length - offset;
+ System.arraycopy(buf, localOff, data, offset, bytesToCopy);
+ offset += bytesToCopy;
+
+ // Flush our internal buffer
+ flush();
+ localOff += bytesToCopy;
+ localLen -= bytesToCopy;
+ }
+
+ if (localLen > 0) {
+ // Simply copy the remainder from the users buffer into our internal
+ // buffer
+ System.arraycopy(buf, localOff, data, offset, localLen);
+ offset += localLen;
+ }
+
+ // Statistics will get incremented again as part of the batch updates,
+ // decrement here to avoid double value
+ if (statistics != null) {
+ statistics.incrementBytesWritten(-bytesToWrite);
+ }
+ }
+
+ @Override
+ public final synchronized void flush() throws IOException {
+ if (streamClosed) {
+ throw new IOException(fsPath + " stream object is closed.");
+ }
+
+ if (offset > 0) {
+ try {
+ commit(data, 0, offset, eof);
+ } catch (IOException e) {
+ hadError = true;
+ throw e;
+ }
+ }
+
+ offset = 0;
+ }
+
+ @Override
+ public final synchronized void close() throws IOException {
+ // Stream is closed earlier, return quietly.
+ if(streamClosed) {
+ return;
+ }
+
+ if (featureRedirectOff) {
+ eof = true;
+ }
+
+ boolean flushedSomething = false;
+ if (hadError) {
+ // No point proceeding further since the error has occurred and
+ // stream would be required to upload again.
+ streamClosed = true;
+ return;
+ } else {
+ flushedSomething = offset > 0;
+ try {
+ flush();
+ } finally {
+ streamClosed = true;
+ }
+ }
+
+ if (featureRedirectOff) {
+ // If we didn't flush anything from our internal buffer, we have to
+ // call the service again
+ // with an empty payload and flush=true in the url
+ if (!flushedSomething) {
+ try {
+ commit(null, 0, ADLConfKeys.KB, true);
+ } finally {
+ streamClosed = true;
+ }
+ }
+ }
+ }
+
+ private void commit(byte[] buffer, int off, int len, boolean endOfFile)
+ throws IOException {
+ OutputStream out = null;
+ try {
+ if (featureRedirectOff) {
+ AppendADLNoRedirectParam skipRedirect = new AppendADLNoRedirectParam(
+ true);
+ Param, ?>[] tmpParam = featureFlushWhenEOF ?
+ new Param, ?>[parameters.length + 3] :
+ new Param, ?>[parameters.length + 1];
+ System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+ tmpParam[parameters.length] = skipRedirect;
+ if (featureFlushWhenEOF) {
+ tmpParam[parameters.length + 1] = new ADLFlush(endOfFile);
+ tmpParam[parameters.length + 2] = new OffsetParam(length);
+ }
+
+ out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
+ len, tmpParam).run();
+ } else {
+ out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
+ len, parameters).run();
+ }
+
+ if (buffer != null) {
+ out.write(buffer, off, len);
+ length += len;
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ private byte[] getBuffer() {
+ // Switch between the first and second buffer
+ dataBuffers = new byte[bufSize];
+ return dataBuffers;
+ }
+ }
+
+ /**
+ * Read data from backend in chunks instead of persistent connection. This
+ * is to avoid slow reader causing socket
+ * timeout.
+ */
+ protected class BatchByteArrayInputStream extends FSInputStream {
+
+ private static final int SIZE4MB = 4 * 1024 * 1024;
+ private final URL runner;
+ private byte[] data = null;
+ private long validDataHoldingSize = 0;
+ private int bufferOffset = 0;
+ private long currentFileOffset = 0;
+ private long nextFileOffset = 0;
+ private long fileSize = 0;
+ private StreamState state = StreamState.Initial;
+ private int maxBufferSize;
+ private int maxConcurrentConnection;
+ private Path fsPath;
+ private boolean streamIsClosed;
+ private Future[] subtasks = null;
+
+ BatchByteArrayInputStream(URL url, Path p, int bufferSize,
+ int concurrentConnection) throws IOException {
+ this.runner = url;
+ fsPath = p;
+ FileStatus fStatus = getFileStatus(fsPath);
+ if (!fStatus.isFile()) {
+ throw new IOException("Cannot open the directory " + p + " for " +
+ "reading");
+ }
+ fileSize = fStatus.getLen();
+ this.maxBufferSize = bufferSize;
+ this.maxConcurrentConnection = concurrentConnection;
+ this.streamIsClosed = false;
+ }
+
+ @Override
+ public synchronized final int read(long position, byte[] buffer, int offset,
+ int length) throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ long oldPos = this.getPos();
+
+ int nread1;
+ try {
+ this.seek(position);
+ nread1 = this.read(buffer, offset, length);
+ } finally {
+ this.seek(oldPos);
+ }
+
+ return nread1;
+ }
+
+ @Override
+ public synchronized final int read() throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ int status = doBufferAvailabilityCheck();
+ if (status == -1) {
+ return status;
+ }
+ int ch = data[bufferOffset++] & (0xff);
+ if (statistics != null) {
+ statistics.incrementBytesRead(1);
+ }
+ return ch;
+ }
+
+ @Override
+ public synchronized final void readFully(long position, byte[] buffer,
+ int offset, int length) throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+
+ super.readFully(position, buffer, offset, length);
+ if (statistics != null) {
+ statistics.incrementBytesRead(length);
+ }
+ }
+
+ @Override
+ public synchronized final int read(byte[] b, int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new IllegalArgumentException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ int status = doBufferAvailabilityCheck();
+ if (status == -1) {
+ return status;
+ }
+
+ int byteRead = 0;
+ long availableBytes = validDataHoldingSize - off;
+ long requestedBytes = bufferOffset + len - off;
+ if (requestedBytes <= availableBytes) {
+ System.arraycopy(data, bufferOffset, b, off, len);
+ bufferOffset += len;
+ byteRead = len;
+ } else {
+ byteRead = super.read(b, off, len);
+ }
+
+ if (statistics != null) {
+ statistics.incrementBytesRead(byteRead);
+ }
+
+ return byteRead;
+ }
+
+ private int doBufferAvailabilityCheck() throws IOException {
+ if (state == StreamState.Initial) {
+ validDataHoldingSize = fill(nextFileOffset);
+ }
+
+ long dataReloadSize = 0;
+ switch ((int) validDataHoldingSize) {
+ case -1:
+ state = StreamState.StreamEnd;
+ return -1;
+ case 0:
+ dataReloadSize = fill(nextFileOffset);
+ if (dataReloadSize <= 0) {
+ state = StreamState.StreamEnd;
+ return (int) dataReloadSize;
+ } else {
+ validDataHoldingSize = dataReloadSize;
+ }
+ break;
+ default:
+ break;
+ }
+
+ if (bufferOffset >= validDataHoldingSize) {
+ dataReloadSize = fill(nextFileOffset);
+ }
+
+ if (bufferOffset >= ((dataReloadSize == 0) ?
+ validDataHoldingSize :
+ dataReloadSize)) {
+ state = StreamState.StreamEnd;
+ return -1;
+ }
+
+ validDataHoldingSize = ((dataReloadSize == 0) ?
+ validDataHoldingSize :
+ dataReloadSize);
+ state = StreamState.DataCachedInLocalBuffer;
+ return 0;
+ }
+
+ private long fill(final long off) throws IOException {
+ if (state == StreamState.StreamEnd) {
+ return -1;
+ }
+
+ if (fileSize <= off) {
+ state = StreamState.StreamEnd;
+ return -1;
+ }
+ int len = maxBufferSize;
+ long fileOffset = 0;
+ boolean isEntireFileCached = true;
+ if ((fileSize <= maxBufferSize)) {
+ len = (int) fileSize;
+ currentFileOffset = 0;
+ nextFileOffset = 0;
+ } else {
+ if (len > (fileSize - off)) {
+ len = (int) (fileSize - off);
+ }
+
+ synchronized (BufferManager.getLock()) {
+ if (BufferManager.getInstance()
+ .hasValidDataForOffset(fsPath.toString(), off)) {
+ len = (int) (
+ BufferManager.getInstance().getBufferOffset() + BufferManager
+ .getInstance().getBufferSize() - (int) off);
+ }
+ }
+
+ if (len <= 0) {
+ len = maxBufferSize;
+ }
+ fileOffset = off;
+ isEntireFileCached = false;
+ }
+
+ data = null;
+ BufferManager bm = BufferManager.getInstance();
+ data = bm.getEmpty(len);
+ boolean fetchDataOverNetwork = false;
+ synchronized (BufferManager.getLock()) {
+ if (bm.hasData(fsPath.toString(), fileOffset, len)) {
+ try {
+ bm.get(data, fileOffset);
+ validDataHoldingSize = data.length;
+ currentFileOffset = fileOffset;
+ } catch (ArrayIndexOutOfBoundsException e) {
+ fetchDataOverNetwork = true;
+ }
+ } else {
+ fetchDataOverNetwork = true;
+ }
+ }
+
+ if (fetchDataOverNetwork) {
+ int splitSize = getSplitSize(len);
+ try {
+ validDataHoldingSize = fillDataConcurrently(data, len, fileOffset,
+ splitSize);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted filling buffer", e);
+ }
+
+ synchronized (BufferManager.getLock()) {
+ bm.add(data, fsPath.toString(), fileOffset);
+ }
+ currentFileOffset = nextFileOffset;
+ }
+
+ nextFileOffset += validDataHoldingSize;
+ state = StreamState.DataCachedInLocalBuffer;
+ bufferOffset = isEntireFileCached ? (int) off : 0;
+ return validDataHoldingSize;
+ }
+
+ int getSplitSize(int size) {
+ if (size <= SIZE4MB) {
+ return 1;
+ }
+
+ // Not practical
+ if (size > maxBufferSize) {
+ size = maxBufferSize;
+ }
+
+ int equalBufferSplit = Math.max(Math.round(size / SIZE4MB), 1);
+ int splitSize = Math.min(equalBufferSplit, maxConcurrentConnection);
+ return splitSize;
+ }
+
+ @Override
+ public synchronized final void seek(long pos) throws IOException {
+ if (pos == -1) {
+ throw new IOException("Bad offset, cannot seek to " + pos);
+ }
+
+ BufferManager bm = BufferManager.getInstance();
+ synchronized (BufferManager.getLock()) {
+ if (bm.hasValidDataForOffset(fsPath.toString(), pos)) {
+ state = StreamState.DataCachedInLocalBuffer;
+ } else if (pos >= 0) {
+ state = StreamState.Initial;
+ }
+ }
+
+ long availableBytes = (currentFileOffset + validDataHoldingSize);
+
+ // Check if this position falls under buffered data
+ if (pos < currentFileOffset || availableBytes <= 0) {
+ validDataHoldingSize = 0;
+ currentFileOffset = pos;
+ nextFileOffset = pos;
+ bufferOffset = 0;
+ return;
+ }
+
+ if (pos < availableBytes && pos >= currentFileOffset) {
+ state = StreamState.DataCachedInLocalBuffer;
+ bufferOffset = (int) (pos - currentFileOffset);
+ } else {
+ validDataHoldingSize = 0;
+ currentFileOffset = pos;
+ nextFileOffset = pos;
+ bufferOffset = 0;
+ }
+ }
+
+ @Override
+ public synchronized final long getPos() throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ return bufferOffset + currentFileOffset;
+ }
+
+ @Override
+ public synchronized final int available() throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public final boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private int fillDataConcurrently(byte[] byteArray, int length,
+ long globalOffset, int splitSize)
+ throws IOException, InterruptedException {
+ ExecutorService executor = Executors.newFixedThreadPool(splitSize);
+ subtasks = new Future[splitSize];
+ for (int i = 0; i < splitSize; i++) {
+ int offset = i * (length / splitSize);
+ int splitLength = (splitSize == (i + 1)) ?
+ (length / splitSize) + (length % splitSize) :
+ (length / splitSize);
+ subtasks[i] = executor.submit(
+ new BackgroundReadThread(byteArray, offset, splitLength,
+ globalOffset + offset));
+ }
+
+ executor.shutdown();
+ // wait until all tasks are finished
+ executor.awaitTermination(ADLConfKeys.DEFAULT_TIMEOUT_IN_SECONDS,
+ TimeUnit.SECONDS);
+
+ int totalBytePainted = 0;
+ for (int i = 0; i < splitSize; ++i) {
+ try {
+ totalBytePainted += (Integer) subtasks[i].get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e.getCause());
+ } catch (ExecutionException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e.getCause());
+ }
+ }
+
+ if (totalBytePainted != length) {
+ throw new IOException("Expected " + length + " bytes, Got " +
+ totalBytePainted + " bytes");
+ }
+
+ return totalBytePainted;
+ }
+
+ @Override
+ public synchronized final void close() throws IOException {
+ synchronized (BufferManager.getLock()) {
+ BufferManager.getInstance().clear();
+ }
+ //need to cleanup the above code the stream and connection close doesn't
+ // happen here
+ //flag set to mark close happened, cannot use the stream once closed
+ streamIsClosed = true;
+ }
+
+ /**
+ * Reads data from the ADL backend from the specified global offset and
+ * given
+ * length. Read data from ADL backend is copied to buffer array from the
+ * offset value specified.
+ *
+ * @param buffer Store read data from ADL backend in the buffer.
+ * @param offset Store read data from ADL backend in the buffer
+ * from the
+ * offset.
+ * @param length Size of the data read from the ADL backend.
+ * @param globalOffset Read data from file offset.
+ * @return Number of bytes read from the ADL backend
+ * @throws IOException For any intermittent server issues or internal
+ * failures.
+ */
+ private int fillUpData(byte[] buffer, int offset, int length,
+ long globalOffset) throws IOException {
+ int totalBytesRead = 0;
+ final URL offsetUrl = new URL(
+ runner + "&" + new OffsetParam(String.valueOf(globalOffset)) + "&"
+ + new LengthParam(String.valueOf(length)));
+ HttpURLConnection conn = new URLRunner(GetOpParam.Op.OPEN, offsetUrl,
+ true).run();
+ InputStream in = conn.getInputStream();
+ try {
+ int bytesRead = 0;
+ while ((bytesRead = in.read(buffer, (int) offset + totalBytesRead,
+ (int) (length - totalBytesRead))) > 0) {
+ totalBytesRead += bytesRead;
+ }
+
+ // InputStream must be fully consumed to enable http keep-alive
+ if (bytesRead == 0) {
+ // Looking for EOF marker byte needs to be read.
+ if (in.read() != -1) {
+ throw new SocketException(
+ "Server returned more than requested data.");
+ }
+ }
+ } finally {
+ in.close();
+ conn.disconnect();
+ }
+
+ return totalBytesRead;
+ }
+
+ private class BackgroundReadThread implements Callable {
+
+ private final byte[] data;
+ private int offset;
+ private int length;
+ private long globalOffset;
+
+ BackgroundReadThread(byte[] buffer, int off, int size, long position) {
+ this.data = buffer;
+ this.offset = off;
+ this.length = size;
+ this.globalOffset = position;
+ }
+
+ public Object call() throws IOException {
+ return fillUpData(data, offset, length, globalOffset);
+ }
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
new file mode 100644
index 0000000000..d7dce250f0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Exposing AccessTokenProvider publicly to extend in com.microsoft.azure
+ * .datalake package. Extended version to cache
+ * token for the process to gain performance gain.
+ */
+@Private
+@Unstable
+public abstract class PrivateCachedRefreshTokenBasedAccessTokenProvider
+ extends AccessTokenProvider {
+
+ // visibility workaround
+
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
new file mode 100644
index 0000000000..7a9dffa1cf
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * A distributed implementation of {@link
+ * org.apache.hadoop.hdfs.web.oauth2} for oauth2 token management support.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
new file mode 100644
index 0000000000..1cc82736f9
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * A distributed implementation of {@link org.apache.hadoop.hdfs.web} for
+ * reading and writing files on Azure data lake file system. This
+ * implementation is derivation from the webhdfs specification.
+ */
+package org.apache.hadoop.hdfs.web;
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
new file mode 100644
index 0000000000..b76aaaae55
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+/**
+ * Query parameter to notify backend server that the all the data has been
+ * pushed to over the stream.
+ *
+ * Used in operation code Create and Append.
+ */
+public class ADLFlush extends BooleanParam {
+ /**
+ * Parameter name.
+ */
+ public static final String NAME = "flush";
+
+ private static final Domain DOMAIN = new Domain(NAME);
+
+ /**
+ * Constructor.
+ *
+ * @param value the parameter value.
+ */
+ public ADLFlush(final Boolean value) {
+ super(DOMAIN, value);
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
new file mode 100644
index 0000000000..6b3708fcb4
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+import java.net.HttpURLConnection;
+
+/**
+ * Extended Webhdfs GetOpParam to avoid redirect operation for azure data
+ * lake storage.
+ */
+public class ADLGetOpParam extends HttpOpParam {
+ private static final Domain DOMAIN = new Domain(NAME, Op.class);
+
+ /**
+ * Constructor.
+ *
+ * @param str a string representation of the parameter value.
+ */
+ public ADLGetOpParam(final String str) {
+ super(DOMAIN, DOMAIN.parse(str));
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+
+ /**
+ * Get operations.
+ */
+ public static enum Op implements HttpOpParam.Op {
+ OPEN(false, HttpURLConnection.HTTP_OK);
+
+ private final boolean redirect;
+ private final int expectedHttpResponseCode;
+ private final boolean requireAuth;
+
+ Op(final boolean doRedirect, final int expectHttpResponseCode) {
+ this(doRedirect, expectHttpResponseCode, false);
+ }
+
+ Op(final boolean doRedirect, final int expectHttpResponseCode,
+ final boolean doRequireAuth) {
+ this.redirect = doRedirect;
+ this.expectedHttpResponseCode = expectHttpResponseCode;
+ this.requireAuth = doRequireAuth;
+ }
+
+ @Override
+ public HttpOpParam.Type getType() {
+ return HttpOpParam.Type.GET;
+ }
+
+ @Override
+ public boolean getRequireAuth() {
+ return requireAuth;
+ }
+
+ @Override
+ public boolean getDoOutput() {
+ return false;
+ }
+
+ @Override
+ public boolean getRedirect() {
+ return redirect;
+ }
+
+ @Override
+ public int getExpectedHttpResponseCode() {
+ return expectedHttpResponseCode;
+ }
+
+ @Override
+ public String toQueryString() {
+ return NAME + "=" + this;
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLPostOpParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLPostOpParam.java
new file mode 100644
index 0000000000..7f7e749327
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLPostOpParam.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+import java.net.HttpURLConnection;
+
+/**
+ * Extended Webhdfs PostOpParam to avoid redirect during append operation for
+ * azure data lake storage.
+ */
+
+public class ADLPostOpParam extends HttpOpParam {
+ private static final Domain DOMAIN = new Domain(NAME,
+ Op.class);
+
+ /**
+ * Constructor.
+ *
+ * @param str a string representation of the parameter value.
+ */
+ public ADLPostOpParam(final String str) {
+ super(DOMAIN, DOMAIN.parse(str));
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+
+ /**
+ * Post operations.
+ */
+ public static enum Op implements HttpOpParam.Op {
+ APPEND(true, false, HttpURLConnection.HTTP_OK);
+
+ private final boolean redirect;
+ private final boolean doOutput;
+ private final int expectedHttpResponseCode;
+
+ Op(final boolean doOut, final boolean doRedirect,
+ final int expectHttpResponseCode) {
+ this.doOutput = doOut;
+ this.redirect = doRedirect;
+ this.expectedHttpResponseCode = expectHttpResponseCode;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.POST;
+ }
+
+ @Override
+ public boolean getRequireAuth() {
+ return false;
+ }
+
+ @Override
+ public boolean getDoOutput() {
+ return doOutput;
+ }
+
+ @Override
+ public boolean getRedirect() {
+ return redirect;
+ }
+
+ @Override
+ public int getExpectedHttpResponseCode() {
+ return expectedHttpResponseCode;
+ }
+
+ /**
+ * @return a URI query string.
+ */
+ @Override
+ public String toQueryString() {
+ return NAME + "=" + this;
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLPutOpParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLPutOpParam.java
new file mode 100644
index 0000000000..d300a1c69e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLPutOpParam.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+import java.net.HttpURLConnection;
+
+/**
+ * Extended Webhdfs PutOpParam to avoid redirect during Create operation for
+ * azure data lake storage.
+ */
+public class ADLPutOpParam extends HttpOpParam {
+ private static final Domain DOMAIN = new Domain(NAME, Op.class);
+
+ /**
+ * Constructor.
+ *
+ * @param str a string representation of the parameter value.
+ */
+ public ADLPutOpParam(final String str) {
+ super(DOMAIN, DOMAIN.parse(str));
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+
+ /**
+ * Put operations.
+ */
+ public static enum Op implements HttpOpParam.Op {
+ CREATE(true, false, HttpURLConnection.HTTP_CREATED);
+
+ private final boolean redirect;
+ private final boolean doOutput;
+ private final int expectedHttpResponseCode;
+ private final boolean requireAuth;
+
+ Op(final boolean doOut, final boolean doRedirect,
+ final int expectHttpResponseCode) {
+ this.doOutput = doOut;
+ this.redirect = doRedirect;
+ this.expectedHttpResponseCode = expectHttpResponseCode;
+ this.requireAuth = false;
+ }
+
+ @Override
+ public HttpOpParam.Type getType() {
+ return HttpOpParam.Type.PUT;
+ }
+
+ @Override
+ public boolean getRequireAuth() {
+ return requireAuth;
+ }
+
+ @Override
+ public boolean getDoOutput() {
+ return doOutput;
+ }
+
+ @Override
+ public boolean getRedirect() {
+ return redirect;
+ }
+
+ @Override
+ public int getExpectedHttpResponseCode() {
+ return expectedHttpResponseCode;
+ }
+
+ @Override
+ public String toQueryString() {
+ return NAME + "=" + this;
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLVersionInfo.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLVersionInfo.java
new file mode 100644
index 0000000000..0bfe5219a8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLVersionInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+import org.apache.hadoop.hdfs.web.ADLConfKeys;
+
+import java.util.regex.Pattern;
+
+/**
+ * Capture ADL Jar version information. Require for debugging and analysis
+ * purpose in the backend.
+ */
+public class ADLVersionInfo extends StringParam {
+ /**
+ * Parameter name.
+ */
+ public static final String NAME = ADLConfKeys.ADL_WEBSDK_VERSION_KEY;
+
+ private static final StringParam.Domain DOMAIN = new StringParam.Domain(NAME,
+ Pattern.compile(".+"));
+
+ /**
+ * Constructor.
+ * @param featureSetVersion Enabled featured information
+ */
+ public ADLVersionInfo(String featureSetVersion) {
+ super(DOMAIN, featureSetVersion);
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/AppendADLNoRedirectParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/AppendADLNoRedirectParam.java
new file mode 100644
index 0000000000..b9ea79e4e5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/AppendADLNoRedirectParam.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+/**
+ * Overwrite parameter.
+ */
+public class AppendADLNoRedirectParam extends BooleanParam {
+ /**
+ * Parameter name.
+ */
+ public static final String NAME = "append";
+
+ private static final Domain DOMAIN = new Domain(NAME);
+
+ /**
+ * Constructor.
+ *
+ * @param value the parameter value.
+ */
+ public AppendADLNoRedirectParam(final Boolean value) {
+ super(DOMAIN, value);
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/CreateADLNoRedirectParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/CreateADLNoRedirectParam.java
new file mode 100644
index 0000000000..83f39702c3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/CreateADLNoRedirectParam.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+/**
+ * Overwrite parameter.
+ */
+public class CreateADLNoRedirectParam extends BooleanParam {
+ /**
+ * Parameter name.
+ */
+ public static final String NAME = "write";
+
+ private static final Domain DOMAIN = new Domain(NAME);
+
+ /**
+ * Constructor.
+ *
+ * @param value the parameter value.
+ */
+ public CreateADLNoRedirectParam(final Boolean value) {
+ super(DOMAIN, value);
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/LeaseParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/LeaseParam.java
new file mode 100644
index 0000000000..680123501d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/LeaseParam.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+/**
+ * To support single writer semantics. Notify to ADL backend if the stream
+ * needs to locked in order to protect
+ * concurrent write operation on the same stream.
+ *
+ * Used in append operation.
+ */
+public class LeaseParam extends StringParam {
+
+ public static final String NAME = "leaseId";
+ /**
+ * Default parameter value.
+ */
+ public static final String DEFAULT = NULL;
+
+ private static final StringParam.Domain DOMAIN = new StringParam.Domain(NAME,
+ null);
+
+ /**
+ * Constructor.
+ *
+ * @param str a string representation of the parameter value.
+ */
+ public LeaseParam(final String str) {
+ super(DOMAIN, str == null || str.equals(DEFAULT) ? null : str);
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ReadADLNoRedirectParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ReadADLNoRedirectParam.java
new file mode 100644
index 0000000000..a600161996
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ReadADLNoRedirectParam.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+/**
+ * Overwrite parameter.
+ */
+public class ReadADLNoRedirectParam extends BooleanParam {
+ /**
+ * Parameter name.
+ */
+ public static final String NAME = "read";
+
+ private static final Domain DOMAIN = new Domain(NAME);
+
+ /**
+ * Constructor.
+ *
+ * @param value the parameter value.
+ */
+ public ReadADLNoRedirectParam(final Boolean value) {
+ super(DOMAIN, value);
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/package-info.java
new file mode 100644
index 0000000000..2231cc2532
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * A distributed implementation of {@link
+ * org.apache.hadoop.hdfs.web.resources} for reading or extending query
+ * parameter for webhdfs specification. ADL
+ * specific
+ * query parameter also goes in the same package.
+ */
+package org.apache.hadoop.hdfs.web.resources;
diff --git a/hadoop-tools/hadoop-azure-datalake/src/site/markdown/index.md b/hadoop-tools/hadoop-azure-datalake/src/site/markdown/index.md
new file mode 100644
index 0000000000..4158c88aff
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/site/markdown/index.md
@@ -0,0 +1,219 @@
+
+
+# Hadoop Azure Data Lake Support
+
+* [Introduction](#Introduction)
+* [Features](#Features)
+* [Limitations](#Limitations)
+* [Usage](#Usage)
+ * [Concepts](#Concepts)
+ * [Webhdfs Compliance](#Webhdfs_Specification_Compliance)
+ * [OAuth2 Support](#OAuth2_Support)
+ * [Read Ahead Buffer Management](Read_Ahead_Buffer_Management)
+ * [Configuring Credentials & FileSystem](#Configuring_Credentials)
+ * [Accessing adl URLs](#Accessing_adl_URLs)
+* [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module)
+
+## Introduction
+
+The hadoop-azure-datalake module provides support for integration with
+[Azure Data Lake Store]( https://azure.microsoft.com/en-in/documentation/services/data-lake-store/).
+The jar file is named azure-datalake-store.jar.
+
+## Features
+
+* Read and write data stored in an Azure Data Lake Storage account.
+* Partial support for [Webhdfs Specification 2.7.0](https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-hdfs/WebHDFS.html)
+* Reference file system paths using URLs using the `adl` scheme for Secure Webhdfs i.e. SSL
+ encrypted access.
+* Can act as a source of data in a MapReduce job, or a sink.
+* Tested on both Linux and Windows.
+* Tested for scale.
+
+## Limitations
+Partial or no support for the following operations in [Webhdfs Specification 2.7.0](https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-hdfs/WebHDFS.html):
+
+* Operation on Symbolic Link
+* Proxy Users
+* File Truncate
+* File Checksum
+* File replication factor
+* Home Directory Partial supported based on OAuth2 token information and not the active user on Hadoop cluster.
+* Extended Attributes(XAttrs) Operations
+* Snapshot Operations
+* Delegation Token Operations
+* User and group information returned as ListStatus and GetFileStatus is in form of GUID associated in Azure Active Directory.
+
+## Usage
+
+### Concepts
+Azure Data Lake Storage access path syntax is
+
+ adl://.azuredatalakestore.net/
+
+Get started with azure data lake account with [https://azure.microsoft.com/en-in/documentation/articles/data-lake-store-get-started-portal/](https://azure.microsoft.com/en-in/documentation/articles/data-lake-store-get-started-portal/)
+
+#### Webhdfs Compliance
+Azure Data Lake Storage exposes a public REST endpoint as per [Webhdfs Specification 2.7.0](https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) to access storage file system.
+
+Syntax to access Azure data lake storage account over [Webhdfs Specification 2.7.0](https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) is
+
+ https://.azuredatalakestore.net/webhdfs/v1/?
+
+
+#### OAuth2 Support
+Usage of Azure Data Lake Storage requires OAuth2 bearer token to be present as part of the HTTPS header as per OAuth2 specification. Valid OAuth2 bearer token should be obtained from Azure Active Directory for valid users who have access to Azure Data Lake Storage Account.
+
+Azure Active Directory (Azure AD) is Microsoft’s multi-tenant cloud based directory and identity management service. See [https://azure.microsoft.com/en-in/documentation/articles/active-directory-whatis/](https://azure.microsoft.com/en-in/documentation/articles/active-directory-whatis/)
+
+Following sections describes on OAuth2 configuration in core-site.xml.
+
+#### Read Ahead Buffer Management
+Azure Data Lake Storage offers high throughput. To maximize throughput, applications can use this feature to buffer data concurrently, in memory during read operation. This data is cached in memory per process per stream.
+
+
+To Enable/Disable read ahead feature.
+
+
+ adl.feature.override.readahead
+ true
+
+ Enables read aheads in the ADL client, the feature is used to improve read throughput.
+ This works in conjunction with the value set in adl.feature.override.readahead.max.buffersize.
+ When set to false the read ahead feature is turned off.
+ Default : True if not configured.
+
+
+
+To configure read ahead buffer size.
+
+
+ adl.feature.override.readahead.max.buffersize
+ 8388608
+
+ Define maximum buffer size to cache read ahead data, this is allocated per process to
+ cache read ahead data. Applicable only when adl.feature.override.readahead is set to true.
+ Default : 8388608 Byte i.e. 8MB if not configured.
+
+
+
+To configure number of concurrent connection to Azure Data Lake Storage Account.
+
+
+ adl.feature.override.readahead.max.concurrent.connection
+ 2
+
+ Define maximum concurrent connection can be established to
+ read ahead. If the data size is<4MB then only 1 read n/w connection
+ is set. If the data size is >4MB but<8MB then 2 read n/w
+ connection
+ is set. Data >8MB then value set under the property would
+ take
+ effect. Applicable only when adl.feature.override.readahead is set
+ to true and buffer size is >8MB.
+ It is recommended to reset this property if the adl.feature.override.readahead.max.buffersize
+ is < 8MB to gain performance. Application has to consider
+ throttling
+ limit for the account as well before configuring large buffer size.
+
+
+
+## Configuring Credentials & FileSystem
+
+Update core-site.xml for OAuth2 configuration
+
+
+ dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch
+ 0
+
+
+
+ dfs.webhdfs.oauth2.credential
+ bearer.and.refresh.token
+
+
+
+ dfs.webhdfs.oauth2.access.token
+ NOT_SET
+
+
+
+ dfs.webhdfs.oauth2.refresh.url
+ https://login.windows.net/common/oauth2/token/
+
+
+
+ dfs.webhdfs.oauth2.access.token.provider
+ org.apache.hadoop.fs.adl.oauth2.CachedRefreshTokenBasedAccessTokenProvider
+
+
+Application require to set Client id and OAuth2 refresh token from Azure Active Directory associated with client id. See [https://github.com/AzureAD/azure-activedirectory-library-for-java](https://github.com/AzureAD/azure-activedirectory-library-for-java).
+
+**Do not share client id and refresh token, it must be kept secret.**
+
+
+ dfs.webhdfs.oauth2.client.id
+
+
+
+
+ dfs.webhdfs.oauth2.refresh.token
+
+
+
+For ADL FileSystem to take effect. Update core-site.xml with
+
+
+ fs.adl.impl
+ org.apache.hadoop.fs.adl.AdlFileSystem
+
+
+
+ fs.AbstractFileSystem.adl.impl
+ org.apache.hadoop.fs.adl.Adl
+
+
+
+### Accessing adl URLs
+
+After credentials are configured in core-site.xml, any Hadoop component may
+reference files in that Azure Data Lake Storage account by using URLs of the following
+format:
+
+ adl://.azuredatalakestore.net/
+
+The schemes `adl` identify a URL on a file system backed by Azure
+Data Lake Storage. `adl` utilizes encrypted HTTPS access for all interaction with
+the Azure Data Lake Storage API.
+
+For example, the following
+[FileSystem Shell](../hadoop-project-dist/hadoop-common/FileSystemShell.html)
+commands demonstrate access to a storage account named `youraccount`.
+
+ > hadoop fs -mkdir adl://yourcontainer.azuredatalakestore.net/testDir
+
+ > hadoop fs -put testFile adl://yourcontainer.azuredatalakestore.net/testDir/testFile
+
+ > hadoop fs -cat adl://yourcontainer.azuredatalakestore.net/testDir/testFile
+ test file content
+## Testing the azure-datalake-store Module
+The hadoop-azure module includes a full suite of unit tests. Most of the tests will run without additional configuration by running mvn test. This includes tests against mocked storage, which is an in-memory emulation of Azure Data Lake Storage.
+
+A selection of tests can run against the Azure Data Lake Storage. To run tests against Adl storage. Please configure contract-test-options.xml with Adl account information mentioned in the above sections. Also turn on contract test execution flag to trigger tests against Azure Data Lake Storage.
+
+
+ dfs.adl.test.contract.enable
+ true
+
diff --git a/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/oauth2/TestCachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/oauth2/TestCachedRefreshTokenBasedAccessTokenProvider.java
new file mode 100644
index 0000000000..e57d3a9498
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/oauth2/TestCachedRefreshTokenBasedAccessTokenProvider.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.fs.adl.oauth2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.oauth2.AccessTokenProvider;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
+
+/**
+ * Verify cache behavior of ConfRefreshTokenBasedAccessTokenProvider instances.
+ */
+public class TestCachedRefreshTokenBasedAccessTokenProvider {
+
+ private Configuration conf;
+
+ @Rule public TestName name = new TestName();
+ String clientId(int id) {
+ return name.getMethodName() + "_clientID" + id;
+ }
+
+ @Before
+ public void initConfig() {
+ conf = new Configuration(false);
+ conf.set(OAUTH_CLIENT_ID_KEY, clientId(0));
+ conf.set(OAUTH_REFRESH_TOKEN_KEY, "01234567890abcdef");
+ conf.set(OAUTH_REFRESH_URL_KEY, "http://dingo.invalid:80");
+ }
+
+ @Test
+ public void testCacheInstance() throws Exception {
+ final AccessTokenProvider inst0 = mock(AccessTokenProvider.class);
+ when(inst0.getConf()).thenReturn(conf);
+
+ // verify config
+ CachedRefreshTokenBasedAccessTokenProvider t1 = new MockProvider(inst0);
+ t1.setConf(conf);
+ verify(inst0).setConf(any(Configuration.class)); // cloned, not exact match
+
+ // verify cache hit
+ CachedRefreshTokenBasedAccessTokenProvider t2 =
+ new CachedRefreshTokenBasedAccessTokenProvider() {
+ @Override
+ AccessTokenProvider newInstance() {
+ fail("Failed to return cached instance");
+ return null;
+ }
+ };
+ t2.setConf(conf);
+
+ // verify force refresh
+ conf.setBoolean(
+ CachedRefreshTokenBasedAccessTokenProvider.FORCE_REFRESH, true);
+ final AccessTokenProvider inst1 = mock(AccessTokenProvider.class);
+ when(inst1.getConf()).thenReturn(conf);
+ CachedRefreshTokenBasedAccessTokenProvider t3 = new MockProvider(inst1);
+ t3.setConf(conf);
+ verify(inst1).setConf(any(Configuration.class));
+
+ // verify cache miss
+ conf.set(OAUTH_REFRESH_URL_KEY, "http://yak.invalid:80");
+ final AccessTokenProvider inst2 = mock(AccessTokenProvider.class);
+ when(inst2.getConf()).thenReturn(conf);
+ CachedRefreshTokenBasedAccessTokenProvider t4 = new MockProvider(inst2);
+ t4.setConf(conf);
+ verify(inst2).setConf(any(Configuration.class));
+ }
+
+ @Test
+ public void testCacheLimit() throws Exception {
+ final int iter = CachedRefreshTokenBasedAccessTokenProvider.MAX_PROVIDERS;
+ for (int i = 0; i < iter; ++i) {
+ conf.set(OAUTH_CLIENT_ID_KEY, clientId(i));
+ AccessTokenProvider inst = mock(AccessTokenProvider.class);
+ when(inst.getConf()).thenReturn(conf);
+ CachedRefreshTokenBasedAccessTokenProvider t = new MockProvider(inst);
+ t.setConf(conf);
+ verify(inst).setConf(any(Configuration.class));
+ }
+ // verify cache hit
+ for (int i = 0; i < iter; ++i) {
+ conf.set(OAUTH_CLIENT_ID_KEY, clientId(i));
+ CachedRefreshTokenBasedAccessTokenProvider t =
+ new CachedRefreshTokenBasedAccessTokenProvider() {
+ @Override
+ AccessTokenProvider newInstance() {
+ fail("Failed to return cached instance");
+ return null;
+ }
+ };
+ t.setConf(conf);
+ }
+
+ // verify miss, evict 0
+ conf.set(OAUTH_CLIENT_ID_KEY, clientId(iter));
+ final AccessTokenProvider inst = mock(AccessTokenProvider.class);
+ when(inst.getConf()).thenReturn(conf);
+ CachedRefreshTokenBasedAccessTokenProvider t = new MockProvider(inst);
+ t.setConf(conf);
+ verify(inst).setConf(any(Configuration.class));
+
+ // verify miss
+ conf.set(OAUTH_CLIENT_ID_KEY, clientId(0));
+ final AccessTokenProvider inst0 = mock(AccessTokenProvider.class);
+ when(inst0.getConf()).thenReturn(conf);
+ CachedRefreshTokenBasedAccessTokenProvider t0 = new MockProvider(inst0);
+ t0.setConf(conf);
+ verify(inst0).setConf(any(Configuration.class));
+ }
+
+ static class MockProvider extends CachedRefreshTokenBasedAccessTokenProvider {
+ private final AccessTokenProvider inst;
+ MockProvider(AccessTokenProvider inst) {
+ this.inst = inst;
+ }
+ @Override
+ AccessTokenProvider newInstance() {
+ return inst;
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml
index e1fbef11fe..9485741ae2 100644
--- a/hadoop-tools/hadoop-tools-dist/pom.xml
+++ b/hadoop-tools/hadoop-tools-dist/pom.xml
@@ -105,6 +105,12 @@
hadoop-sls
compile
+
+ org.apache.hadoop
+ hadoop-azure-datalake
+ compile
+ ${project.version}
+
diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml
index bd5f784b05..a2c441bde1 100644
--- a/hadoop-tools/pom.xml
+++ b/hadoop-tools/pom.xml
@@ -46,6 +46,7 @@
hadoop-azure
hadoop-aws
hadoop-kafka
+ hadoop-azure-datalake