diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a19015acc1..9fc70f96c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -161,6 +161,10 @@ Release 0.23.1 - UNRELEASED HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker. (todd) + HDFS-2246. Enable reading a block directly from local file system + for a client on the same node as the block file. (Andrew Purtell, + Suresh Srinivas and Jitendra Nath Pandey via szetszwo) + BUG FIXES HDFS-2541. For a sufficiently large value of blocks, the DN Scanner diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java new file mode 100644 index 0000000000..d34d74d438 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.FSDataset; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + +/** + * BlockReaderLocal enables local short circuited reads. If the DFS client is on + * the same machine as the datanode, then the client can read files directly + * from the local file system rather than going through the datanode for better + * performance.
+ * {@link BlockReaderLocal} works as follows: + * + */ +class BlockReaderLocal extends RemoteBlockReader2 { + public static final Log LOG = LogFactory.getLog(DFSClient.class); + + //Stores the cache and proxy for a local datanode. + private static class LocalDatanodeInfo { + private ClientDatanodeProtocol proxy = null; + private final Map cache; + + LocalDatanodeInfo() { + final int cacheSize = 10000; + final float hashTableLoadFactor = 0.75f; + int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1; + cache = Collections + .synchronizedMap(new LinkedHashMap( + hashTableCapacity, hashTableLoadFactor, true) { + private static final long serialVersionUID = 1; + + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + return size() > cacheSize; + } + }); + } + + private synchronized ClientDatanodeProtocol getDatanodeProxy( + DatanodeInfo node, Configuration conf, int socketTimeout) + throws IOException { + if (proxy == null) { + proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf, + socketTimeout); + } + return proxy; + } + + private synchronized void resetDatanodeProxy() { + if (null != proxy) { + RPC.stopProxy(proxy); + proxy = null; + } + } + + private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { + return cache.get(b); + } + + private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) { + cache.put(b, info); + } + + private void removeBlockLocalPathInfo(ExtendedBlock b) { + cache.remove(b); + } + } + + // Multiple datanodes could be running on the local machine. Store proxies in + // a map keyed by the ipc port of the datanode. + private static Map localDatanodeInfoMap = new HashMap(); + + private final FileInputStream dataIn; // reader for the data file + + private FileInputStream checksumIn; // reader for the checksum file + + private int offsetFromChunkBoundary; + + ByteBuffer dataBuff = null; + ByteBuffer checksumBuff = null; + + /** + * The only way this object can be instantiated. + */ + static BlockReaderLocal newBlockReader(Configuration conf, String file, + ExtendedBlock blk, Token token, DatanodeInfo node, + int socketTimeout, long startOffset, long length) throws IOException { + + LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node + .getIpcPort()); + // check the cache first + BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); + if (pathinfo == null) { + pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token); + } + + // check to see if the file exists. It may so happen that the + // HDFS file has been deleted and this block-lookup is occurring + // on behalf of a new HDFS file. This time, the block file could + // be residing in a different portion of the fs.data.dir directory. + // In this case, we remove this entry from the cache. The next + // call to this method will re-populate the cache. + FileInputStream dataIn = null; + FileInputStream checksumIn = null; + BlockReaderLocal localBlockReader = null; + boolean skipChecksumCheck = skipChecksumCheck(conf); + try { + // get a local file system + File blkfile = new File(pathinfo.getBlockPath()); + dataIn = new FileInputStream(blkfile); + + if (LOG.isDebugEnabled()) { + LOG.debug("New BlockReaderLocal for file " + blkfile + " of size " + + blkfile.length() + " startOffset " + startOffset + " length " + + length + " short circuit checksum " + skipChecksumCheck); + } + + if (!skipChecksumCheck) { + // get the metadata file + File metafile = new File(pathinfo.getMetaPath()); + checksumIn = new FileInputStream(metafile); + + // read and handle the common header here. For now just a version + BlockMetadataHeader header = BlockMetadataHeader + .readHeader(new DataInputStream(checksumIn)); + short version = header.getVersion(); + if (version != FSDataset.METADATA_VERSION) { + LOG.warn("Wrong version (" + version + ") for metadata file for " + + blk + " ignoring ..."); + } + DataChecksum checksum = header.getChecksum(); + long firstChunkOffset = startOffset + - (startOffset % checksum.getBytesPerChecksum()); + localBlockReader = new BlockReaderLocal(conf, file, blk, token, + startOffset, length, pathinfo, checksum, true, dataIn, + firstChunkOffset, checksumIn); + } else { + localBlockReader = new BlockReaderLocal(conf, file, blk, token, + startOffset, length, pathinfo, dataIn); + } + } catch (IOException e) { + // remove from cache + localDatanodeInfo.removeBlockLocalPathInfo(blk); + DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk + + " from cache because local file " + pathinfo.getBlockPath() + + " could not be opened."); + throw e; + } finally { + if (localBlockReader == null) { + if (dataIn != null) { + dataIn.close(); + } + if (checksumIn != null) { + checksumIn.close(); + } + } + } + return localBlockReader; + } + + private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) { + LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port); + if (ldInfo == null) { + ldInfo = new LocalDatanodeInfo(); + localDatanodeInfoMap.put(port, ldInfo); + } + return ldInfo; + } + + private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk, + DatanodeInfo node, Configuration conf, int timeout, + Token token) throws IOException { + LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort); + BlockLocalPathInfo pathinfo = null; + ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node, + conf, timeout); + try { + // make RPC to local datanode to find local pathnames of blocks + pathinfo = proxy.getBlockLocalPathInfo(blk, token); + if (pathinfo != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cached location of block " + blk + " as " + pathinfo); + } + localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); + } + } catch (IOException e) { + localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error + throw e; + } + return pathinfo; + } + + private static boolean skipChecksumCheck(Configuration conf) { + return conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); + } + + private BlockReaderLocal(Configuration conf, String hdfsfile, + ExtendedBlock block, Token token, long startOffset, + long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) + throws IOException { + this(conf, hdfsfile, block, token, startOffset, length, pathinfo, + DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 4), false, + dataIn, startOffset, null); + } + + private BlockReaderLocal(Configuration conf, String hdfsfile, + ExtendedBlock block, Token token, long startOffset, + long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, + boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, + FileInputStream checksumIn) throws IOException { + super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn + .getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset, + length, null); + this.dataIn = dataIn; + this.checksumIn = checksumIn; + this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); + dataBuff = bufferPool.getBuffer(bytesPerChecksum*64); + checksumBuff = bufferPool.getBuffer(checksumSize*64); + //Initially the buffers have nothing to read. + dataBuff.flip(); + checksumBuff.flip(); + long toSkip = firstChunkOffset; + while (toSkip > 0) { + long skipped = dataIn.skip(toSkip); + if (skipped == 0) { + throw new IOException("Couldn't initialize input stream"); + } + toSkip -= skipped; + } + if (checksumIn != null) { + long checkSumOffset = (firstChunkOffset / bytesPerChecksum) + * checksumSize; + while (checkSumOffset > 0) { + long skipped = checksumIn.skip(checkSumOffset); + if (skipped == 0) { + throw new IOException("Couldn't initialize checksum input stream"); + } + checkSumOffset -= skipped; + } + } + } + + private int readIntoBuffer(FileInputStream stream, ByteBuffer buf) + throws IOException { + int bytesRead = stream.getChannel().read(buf); + if (bytesRead < 0) { + //EOF + return bytesRead; + } + while (buf.remaining() > 0) { + int n = stream.getChannel().read(buf); + if (n < 0) { + //EOF + return bytesRead; + } + bytesRead += n; + } + return bytesRead; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.info("read off " + off + " len " + len); + } + if (!verifyChecksum) { + return dataIn.read(buf, off, len); + } else { + int dataRead = -1; + if (dataBuff.remaining() == 0) { + dataBuff.clear(); + checksumBuff.clear(); + dataRead = readIntoBuffer(dataIn, dataBuff); + readIntoBuffer(checksumIn, checksumBuff); + checksumBuff.flip(); + dataBuff.flip(); + if (verifyChecksum) { + checksum.verifyChunkedSums(dataBuff, checksumBuff, filename, + this.startOffset); + } + } else { + dataRead = dataBuff.remaining(); + } + if (dataRead > 0) { + int nRead = Math.min(dataRead - offsetFromChunkBoundary, len); + if (offsetFromChunkBoundary > 0) { + dataBuff.position(offsetFromChunkBoundary); + // Its either end of file or dataRead is greater than the + // offsetFromChunkBoundary + offsetFromChunkBoundary = 0; + } + if (nRead > 0) { + dataBuff.get(buf, off, nRead); + return nRead; + } else { + return 0; + } + } else { + return -1; + } + } + } + + @Override + public synchronized long skip(long n) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("skip " + n); + } + if (!verifyChecksum) { + return dataIn.skip(n); + } else { + return super.skip(n); + } + } + + @Override + public synchronized void close() throws IOException { + dataIn.close(); + if (checksumIn != null) { + checksumIn.close(); + } + if (dataBuff != null) { + bufferPool.returnBuffer(dataBuff); + dataBuff = null; + } + if (checksumBuff != null) { + bufferPool.returnBuffer(checksumBuff); + checksumBuff = null; + } + super.close(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index ab65087663..0dc224184c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -25,12 +25,18 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.NetworkInterface; import java.net.Socket; +import java.net.SocketException; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.net.SocketFactory; @@ -78,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; @@ -100,6 +107,7 @@ import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -230,7 +238,8 @@ Conf getConf() { */ private final Map filesBeingWritten = new HashMap(); - + private boolean shortCircuitLocalReads; + /** * Same as this(NameNode.getAddress(conf), conf); * @see #DFSClient(InetSocketAddress, Configuration) @@ -293,6 +302,13 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf, "Expecting exactly one of nameNodeAddr and rpcNamenode being null: " + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode); } + // read directly from the block file if configured. + this.shortCircuitLocalReads = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug("Short circuit read is " + shortCircuitLocalReads); + } } /** @@ -499,6 +515,82 @@ public long renewDelegationToken(Token token) } } + /** + * Get {@link BlockReader} for short circuited local reads. + */ + static BlockReader getLocalBlockReader(Configuration conf, + String src, ExtendedBlock blk, Token accessToken, + DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock) + throws InvalidToken, IOException { + try { + return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken, + chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes() + - offsetIntoBlock); + } catch (RemoteException re) { + throw re.unwrapRemoteException(InvalidToken.class, + AccessControlException.class); + } + } + + private static Set localIpAddresses = Collections + .synchronizedSet(new HashSet()); + + private static boolean isLocalAddress(InetSocketAddress targetAddr) { + InetAddress addr = targetAddr.getAddress(); + if (localIpAddresses.contains(addr.getHostAddress())) { + if (LOG.isTraceEnabled()) { + LOG.trace("Address " + targetAddr + " is local"); + } + return true; + } + + // Check if the address is any local or loop back + boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress(); + + // Check if the address is defined on any interface + if (!local) { + try { + local = NetworkInterface.getByInetAddress(addr) != null; + } catch (SocketException e) { + local = false; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Address " + targetAddr + " is local"); + } + if (local == true) { + localIpAddresses.add(addr.getHostAddress()); + } + return local; + } + + /** + * Should the block access token be refetched on an exception + * + * @param ex Exception received + * @param targetAddr Target datanode address from where exception was received + * @return true if block access token has expired or invalid and it should be + * refetched + */ + private static boolean tokenRefetchNeeded(IOException ex, + InetSocketAddress targetAddr) { + /* + * Get a new access token and retry. Retry is needed in 2 cases. 1) When + * both NN and DN re-started while DFSClient holding a cached access token. + * 2) In the case that NN fails to update its access key at pre-set interval + * (by a wide margin) and subsequently restarts. In this case, DN + * re-registers itself with NN and receives a new access key, but DN will + * delete the old access key from its memory since it's considered expired + * based on the estimated expiration date. + */ + if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) { + LOG.info("Access token was invalid when connecting to " + targetAddr + + " : " + ex); + return true; + } + return false; + } + /** * Cancel a delegation token * @param token the token to cancel @@ -1590,7 +1682,7 @@ public ExtendedBlock getCurrentBlock() { synchronized List getAllBlocks() throws IOException { return ((DFSInputStream)in).getAllBlocks(); } - + /** * @return The visible length of the file. */ @@ -1598,6 +1690,14 @@ public long getVisibleLength() throws IOException { return ((DFSInputStream)in).getFileLength(); } } + + boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) + throws IOException { + if (shortCircuitLocalReads && isLocalAddress(targetAddr)) { + return true; + } + return false; + } void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) { DatanodeInfo [] dnArr = { dn }; @@ -1620,4 +1720,8 @@ public String toString() { return getClass().getSimpleName() + "[clientName=" + clientName + ", ugi=" + ugi + "]"; } + + void disableShortCircuit() { + shortCircuitLocalReads = false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 763033537a..8ba74280a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -261,6 +261,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max"; public static final int DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500; + public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit"; + public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false; + public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum"; + public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false; // property for fsimage compression public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress"; @@ -301,4 +305,5 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_CHECKED_VOLUMES_KEY = "dfs.namenode.resource.checked.volumes"; public static final String DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal"; public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab"; + public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 53fa4d2ce6..2b817ffec0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -46,6 +46,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; /**************************************************************** @@ -405,11 +406,8 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { try { ExtendedBlock blk = targetBlock.getBlock(); Token accessToken = targetBlock.getBlockToken(); - - blockReader = getBlockReader( - targetAddr, src, blk, - accessToken, - offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, + blockReader = getBlockReader(targetAddr, chosenNode, src, blk, + accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, buffersize, verifyChecksum, dfsClient.clientName); if(connectFailedOnce) { DFSClient.LOG.info("Successfully connected to " + targetAddr + @@ -666,12 +664,9 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, Token blockToken = block.getBlockToken(); int len = (int) (end - start + 1); - - reader = getBlockReader(targetAddr, src, - block.getBlock(), - blockToken, - start, len, buffersize, - verifyChecksum, dfsClient.clientName); + reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(), + blockToken, start, len, buffersize, verifyChecksum, + dfsClient.clientName); int nread = reader.readAll(buf, offset, len); if (nread != len) { throw new IOException("truncated return from reader.read(): " + @@ -684,6 +679,10 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, e.getPos() + " from " + chosenNode.getName()); // we want to remember what we have tried addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); + } catch (AccessControlException ex) { + DFSClient.LOG.warn("Short circuit access failed ", ex); + dfsClient.disableShortCircuit(); + continue; } catch (IOException e) { if (e instanceof InvalidBlockTokenException && refetchToken > 0) { DFSClient.LOG.info("Will get a new access token and retry, " @@ -726,6 +725,7 @@ private void closeBlockReader(BlockReader reader) throws IOException { * Otherwise, it will create a new connection. * * @param dnAddr Address of the datanode + * @param chosenNode Chosen datanode information * @param file File location * @param block The Block object * @param blockToken The access token for security @@ -737,6 +737,7 @@ private void closeBlockReader(BlockReader reader) throws IOException { * @return New BlockReader instance */ protected BlockReader getBlockReader(InetSocketAddress dnAddr, + DatanodeInfo chosenNode, String file, ExtendedBlock block, Token blockToken, @@ -746,6 +747,12 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, boolean verifyChecksum, String clientName) throws IOException { + + if (dfsClient.shouldTryShortCircuitRead(dnAddr)) { + return DFSClient.getLocalBlockReader(dfsClient.conf, src, block, + blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset); + } + IOException err = null; boolean fromCache = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index fd7596a70e..1870369df8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; @@ -638,6 +639,14 @@ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( locatedBlock); } + /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ + static ClientDatanodeProtocol createClientDatanodeProtocolProxy( + DatanodeID datanodeid, Configuration conf, int socketTimeout) + throws IOException { + return new org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolTranslatorR23( + datanodeid, conf, socketTimeout); + } + /** Create a {@link ClientDatanodeProtocol} proxy */ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 2b2f77ecb4..1f5f12bda7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -85,7 +85,7 @@ public class RemoteBlockReader2 implements BlockReader { Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. private ReadableByteChannel in; - private DataChecksum checksum; + protected DataChecksum checksum; private PacketHeader curHeader; private ByteBuffer curPacketBuf = null; @@ -96,25 +96,25 @@ public class RemoteBlockReader2 implements BlockReader { private long lastSeqNo = -1; /** offset in block where reader wants to actually read */ - private long startOffset; - private final String filename; + protected long startOffset; + protected final String filename; - private static DirectBufferPool bufferPool = + protected static DirectBufferPool bufferPool = new DirectBufferPool(); private ByteBuffer headerBuf = ByteBuffer.allocate( PacketHeader.PKT_HEADER_LEN); - private int bytesPerChecksum; - private int checksumSize; + protected int bytesPerChecksum; + protected int checksumSize; /** * The total number of bytes we need to transfer from the DN. * This is the amount that the user has requested plus some padding * at the beginning so that the read can begin on a chunk boundary. */ - private long bytesNeededToFinish; + protected long bytesNeededToFinish; - private final boolean verifyChecksum; + protected final boolean verifyChecksum; private boolean sentStatusCode = false; @@ -271,7 +271,7 @@ private void readTrailingEmptyPacket() throws IOException { } } - private RemoteBlockReader2(String file, String bpid, long blockId, + protected RemoteBlockReader2(String file, String bpid, long blockId, ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { // Path is used only for printing block and file information in debug diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java new file mode 100644 index 0000000000..a1823b3aad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * A block and the full path information to the block data file and + * the metadata file stored on the local file system. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockLocalPathInfo implements Writable { + static final WritableFactory FACTORY = new WritableFactory() { + public Writable newInstance() { return new BlockLocalPathInfo(); } + }; + static { // register a ctor + WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY); + } + + private ExtendedBlock block; + private String localBlockPath = ""; // local file storing the data + private String localMetaPath = ""; // local file storing the checksum + + public BlockLocalPathInfo() {} + + /** + * Constructs BlockLocalPathInfo. + * @param b The block corresponding to this lock path info. + * @param file Block data file. + * @param metafile Metadata file for the block. + */ + public BlockLocalPathInfo(ExtendedBlock b, String file, String metafile) { + block = b; + localBlockPath = file; + localMetaPath = metafile; + } + + /** + * Get the Block data file. + * @return Block data file. + */ + public String getBlockPath() {return localBlockPath;} + + /** + * Get the Block metadata file. + * @return Block metadata file. + */ + public String getMetaPath() {return localMetaPath;} + + @Override + public void write(DataOutput out) throws IOException { + block.write(out); + Text.writeString(out, localBlockPath); + Text.writeString(out, localMetaPath); + } + + @Override + public void readFields(DataInput in) throws IOException { + block = new ExtendedBlock(); + block.readFields(in); + localBlockPath = Text.readString(in); + localMetaPath = Text.readString(in); + } + + /** + * Get number of bytes in the block. + * @return Number of bytes in the block. + */ + public long getNumBytes() { + return block.getNumBytes(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 1eb33d02d5..c0efa52ec5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -22,9 +22,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; /** An client-datanode protocol for block recovery @@ -82,5 +84,30 @@ public interface ClientDatanodeProtocol extends VersionedProtocol { * deleted along with its contents. * @throws IOException */ - void deleteBlockPool(String bpid, boolean force) throws IOException; + void deleteBlockPool(String bpid, boolean force) throws IOException; + + /** + * Retrieves the path names of the block file and metadata file stored on the + * local file system. + * + * In order for this method to work, one of the following should be satisfied: + *
    + *
  • + * The client user must be configured at the datanode to be able to use this + * method.
  • + *
  • + * When security is enabled, kerberos authentication must be used to connect + * to the datanode.
  • + *
+ * + * @param block + * the specified block on the local datanode + * @param token + * the block access token. + * @return the BlockLocalPathInfo of a block + * @throws IOException + * on error + */ + BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, + Token token) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java index 05297bad22..e13f7db3c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java @@ -21,9 +21,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.token.Token; /** * This class is used on the server side. @@ -116,4 +120,10 @@ public void refreshNamenodes() throws IOException { public void deleteBlockPool(String bpid, boolean force) throws IOException { server.deleteBlockPool(bpid, force); } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, + Token token) throws IOException { + return server.getBlockLocalPathInfo(block, token); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java index 9e384dd95c..9912f81f82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java @@ -26,14 +26,17 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; /** @@ -63,6 +66,23 @@ public ClientDatanodeProtocolTranslatorR23(InetSocketAddress addr, rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory); } + /** + * Constructor. + * @param datanodeid Datanode to connect to. + * @param conf Configuration. + * @param socketTimeout Socket timeout to use. + * @throws IOException + */ + public ClientDatanodeProtocolTranslatorR23(DatanodeID datanodeid, + Configuration conf, int socketTimeout) throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost() + + ":" + datanodeid.getIpcPort()); + rpcProxy = RPC.getProxy(ClientDatanodeWireProtocol.class, + ClientDatanodeWireProtocol.versionID, addr, + UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), socketTimeout); + } + static ClientDatanodeWireProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, LocatedBlock locatedBlock) @@ -134,4 +154,9 @@ public void deleteBlockPool(String bpid, boolean force) throws IOException { } + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, + Token token) throws IOException { + return rpcProxy.getBlockLocalPathInfo(block, token); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java index 551d21007c..819e9c6109 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java @@ -24,11 +24,15 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; import org.apache.hadoop.ipc.ProtocolInfo; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; /** @@ -77,6 +81,13 @@ public interface ClientDatanodeWireProtocol extends VersionedProtocol { */ void deleteBlockPool(String bpid, boolean force) throws IOException; + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)} + */ + BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, + Token token) throws IOException; + /** * This method is defined to get the protocol signature using * the R23 protocol - hence we have added the suffix of 2 to the method name diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java index 65dd34928b..fd8aec7bac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -28,6 +28,9 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + /** @@ -35,7 +38,9 @@ * This is not related to the Block related functionality in Namenode. * The biggest part of data block metadata is CRC for the block. */ -class BlockMetadataHeader { +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockMetadataHeader { static final short METADATA_VERSION = FSDataset.METADATA_VERSION; @@ -51,12 +56,14 @@ class BlockMetadataHeader { this.checksum = checksum; this.version = version; } - - short getVersion() { + + /** Get the version */ + public short getVersion() { return version; } - DataChecksum getChecksum() { + /** Get the checksum */ + public DataChecksum getChecksum() { return checksum; } @@ -67,7 +74,7 @@ DataChecksum getChecksum() { * @return Metadata Header * @throws IOException */ - static BlockMetadataHeader readHeader(DataInputStream in) throws IOException { + public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException { return readHeader(in.readShort(), in); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 40e8fb3467..65ccba80dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -54,6 +54,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -89,6 +90,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -110,6 +112,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -130,6 +133,7 @@ import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23; import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23; import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol; @@ -145,8 +149,10 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -398,6 +404,8 @@ void refreshNamenodes(Configuration conf) private AbstractList dataDirs; private Configuration conf; + private final String userWithLocalPathAccess; + /** * Create the DataNode given a configuration and an array of dataDirs. * 'dataDirs' is where the blocks are stored. @@ -416,6 +424,8 @@ void refreshNamenodes(Configuration conf) final SecureResources resources) throws IOException { super(conf); + this.userWithLocalPathAccess = conf + .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY); try { hostName = getHostName(conf); startDataNode(conf, dataDirs, resources); @@ -1074,6 +1084,68 @@ static String createNewStorageId(int port) { return "DS-" + rand + "-" + ip + "-" + port + "-" + System.currentTimeMillis(); } + + /** Ensure the authentication method is kerberos */ + private void checkKerberosAuthMethod(String msg) throws IOException { + // User invoking the call must be same as the datanode user + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != + AuthenticationMethod.KERBEROS) { + throw new AccessControlException("Error in " + msg + + "Only kerberos based authentication is allowed."); + } + } + + private void checkBlockLocalPathAccess() throws IOException { + checkKerberosAuthMethod("getBlockLocalPathInfo()"); + String currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + if (!currentUser.equals(this.userWithLocalPathAccess)) { + throw new AccessControlException( + "Can't continue with getBlockLocalPathInfo() " + + "authorization. The user " + currentUser + + " is not allowed to call getBlockLocalPathInfo"); + } + } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, + Token token) throws IOException { + checkBlockLocalPathAccess(); + checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); + BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); + if (LOG.isDebugEnabled()) { + if (info != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("getBlockLocalPathInfo successful block=" + block + + " blockfile " + info.getBlockPath() + " metafile " + + info.getMetaPath()); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("getBlockLocalPathInfo for block=" + block + + " returning null"); + } + } + } + metrics.incrBlocksGetLocalPathInfo(); + return info; + } + + private void checkBlockToken(ExtendedBlock block, Token token, + AccessMode accessMode) throws IOException { + if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) { + BlockTokenIdentifier id = new BlockTokenIdentifier(); + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + id.readFields(in); + if (LOG.isDebugEnabled()) { + LOG.debug("Got: " + id.toString()); + } + blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode); + } + } /** * Shut down this instance of the datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index fee2760490..f885c8b21c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -2658,4 +2659,14 @@ public synchronized void deleteBlockPool(String bpid, boolean force) volume.deleteBPDirectories(bpid, force); } } + + @Override // FSDatasetInterface + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) + throws IOException { + File datafile = getBlockFile(block); + File metafile = getMetaFile(datafile, block.getGenerationStamp()); + BlockLocalPathInfo info = new BlockLocalPathInfo(block, + datafile.getAbsolutePath(), metafile.getAbsolutePath()); + return info; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java index 38017cfdb8..2f05f16c2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java @@ -19,6 +19,7 @@ import java.io.Closeable; +import java.io.File; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -402,4 +404,9 @@ public ReplicaInfo updateReplicaUnderRecovery( * @throws IOException */ public void deleteBlockPool(String bpid, boolean force) throws IOException; + + /** + * Get {@link BlockLocalPathInfo} for the given block. + **/ + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 4df11d3434..9e18007810 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -60,6 +60,7 @@ public class DataNodeMetrics { @Metric MutableCounterLong readsFromRemoteClient; @Metric MutableCounterLong writesFromLocalClient; @Metric MutableCounterLong writesFromRemoteClient; + @Metric MutableCounterLong blocksGetLocalPathInfo; @Metric MutableCounterLong volumeFailures; @@ -165,4 +166,9 @@ public void incrReadsFromClient(boolean local) { public void incrVolumeFailures() { volumeFailures.incr(); } + + /** Increment for getBlockLocalPathInfo calls */ + public void incrBlocksGetLocalPathInfo() { + blocksGetLocalPathInfo.incr(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index d7f063b1c6..6673bf547b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.security.token.Token; import org.junit.Test; @@ -209,6 +210,7 @@ public void testReadFromOneDN() throws IOException { MockGetBlockReader answer = new MockGetBlockReader(); Mockito.doAnswer(answer).when(in).getBlockReader( (InetSocketAddress) Matchers.anyObject(), + (DatanodeInfo) Matchers.anyObject(), Matchers.anyString(), (ExtendedBlock) Matchers.anyObject(), (Token) Matchers.anyObject(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java new file mode 100644 index 0000000000..d14b05d076 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertTrue; + +import java.io.EOFException; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for short circuit read functionality using {@link BlockReaderLocal}. + * When a block is being read by a client is on the local datanode, instead of + * using {@link DataTransferProtocol} and connect to datanode, the short circuit + * read allows reading the file directly from the files on the local file + * system. + */ +public class TestShortCircuitLocalRead { + + static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/"; + + static final long seed = 0xDEADBEEFL; + static final int blockSize = 5120; + boolean simulatedStorage = false; + + // creates a file but does not close it + static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) + throws IOException { + FSDataOutputStream stm = fileSys.create(name, true, + fileSys.getConf().getInt("io.file.buffer.size", 4096), + (short)repl, (long)blockSize); + return stm; + } + + static private void checkData(byte[] actual, int from, byte[] expected, + String message) { + checkData(actual, from, expected, actual.length, message); + } + + static private void checkData(byte[] actual, int from, byte[] expected, int len, + String message) { + for (int idx = 0; idx < len; idx++) { + if (expected[from + idx] != actual[idx]) { + Assert.fail(message + " byte " + (from + idx) + " differs. expected " + + expected[from + idx] + " actual " + actual[idx]); + } + } + } + + static void checkFileContent(FileSystem fs, Path name, byte[] expected, + int readOffset) throws IOException { + FSDataInputStream stm = fs.open(name); + byte[] actual = new byte[expected.length-readOffset]; + stm.readFully(readOffset, actual); + checkData(actual, readOffset, expected, "Read 2"); + stm.close(); + // Now read using a different API. + actual = new byte[expected.length-readOffset]; + stm = fs.open(name); + long skipped = stm.skip(readOffset); + Assert.assertEquals(skipped, readOffset); + //Read a small number of bytes first. + int nread = stm.read(actual, 0, 3); + nread += stm.read(actual, nread, 2); + //Read across chunk boundary + nread += stm.read(actual, nread, 517); + checkData(actual, readOffset, expected, nread, "A few bytes"); + //Now read rest of it + while (nread < actual.length) { + int nbytes = stm.read(actual, nread, actual.length - nread); + if (nbytes < 0) { + throw new EOFException("End of file reached before reading fully."); + } + nread += nbytes; + } + checkData(actual, readOffset, expected, "Read 3"); + stm.close(); + } + + /** + * Test that file data can be read by reading the block file + * directly from the local store. + */ + public void doTestShortCircuitRead(boolean ignoreChecksum, int size, + int readOffset) throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + ignoreChecksum); + conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, + UserGroupInformation.getCurrentUser().getShortUserName()); + if (simulatedStorage) { + conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); + } + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .format(true).build(); + FileSystem fs = cluster.getFileSystem(); + try { + // check that / exists + Path path = new Path("/"); + assertTrue("/ should be a directory", fs.getFileStatus(path) + .isDirectory() == true); + + byte[] fileData = AppendTestUtil.randomBytes(seed, size); + // create a new file in home directory. Do not close it. + Path file1 = new Path("filelocal.dat"); + FSDataOutputStream stm = createFile(fs, file1, 1); + + // write to file + stm.write(fileData); + stm.close(); + checkFileContent(fs, file1, fileData, readOffset); + } finally { + fs.close(); + cluster.shutdown(); + } + } + + @Test + public void testFileLocalReadNoChecksum() throws IOException { + doTestShortCircuitRead(true, 3*blockSize+100, 0); + } + + @Test + public void testFileLocalReadChecksum() throws IOException { + doTestShortCircuitRead(false, 3*blockSize+100, 0); + } + + @Test + public void testSmallFileLocalRead() throws IOException { + doTestShortCircuitRead(false, 13, 0); + doTestShortCircuitRead(false, 13, 5); + doTestShortCircuitRead(true, 13, 0); + doTestShortCircuitRead(true, 13, 5); + } + + @Test + public void testReadFromAnOffset() throws IOException { + doTestShortCircuitRead(false, 3*blockSize+100, 777); + doTestShortCircuitRead(true, 3*blockSize+100, 777); + } + + @Test + public void testLongFile() throws IOException { + doTestShortCircuitRead(false, 10*blockSize+100, 777); + doTestShortCircuitRead(true, 10*blockSize+100, 777); + } + + @Test + public void testGetBlockLocalPathInfo() throws IOException, InterruptedException { + final Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser"); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .format(true).build(); + cluster.waitActive(); + final DataNode dn = cluster.getDataNodes().get(0); + FileSystem fs = cluster.getFileSystem(); + try { + DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23); + UserGroupInformation aUgi = UserGroupInformation + .createRemoteUser("alloweduser"); + LocatedBlocks lb = cluster.getNameNode().getRpcServer() + .getBlockLocations("/tmp/x", 0, 16); + // Create a new block object, because the block inside LocatedBlock at + // namenode is of type BlockInfo. + ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock()); + Token token = lb.get(0).getBlockToken(); + final DatanodeInfo dnInfo = lb.get(0).getLocations()[0]; + ClientDatanodeProtocol proxy = aUgi + .doAs(new PrivilegedExceptionAction() { + @Override + public ClientDatanodeProtocol run() throws Exception { + return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, + 60000); + } + }); + + //This should succeed + BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token); + Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(), + blpi.getBlockPath()); + + // Now try with a not allowed user. + UserGroupInformation bUgi = UserGroupInformation + .createRemoteUser("notalloweduser"); + proxy = bUgi + .doAs(new PrivilegedExceptionAction() { + @Override + public ClientDatanodeProtocol run() throws Exception { + return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, + 60000); + } + }); + try { + proxy.getBlockLocalPathInfo(blk, token); + Assert.fail("The call should have failed as " + bUgi.getShortUserName() + + " is not allowed to call getBlockLocalPathInfo"); + } catch (IOException ex) { + Assert.assertTrue(ex.getMessage().contains( + "not allowed to call getBlockLocalPathInfo")); + } + } finally { + fs.close(); + cluster.shutdown(); + } + } + + /** + * Test to run benchmarks between shortcircuit read vs regular read with + * specified number of threads simultaneously reading. + *
+ * Run this using the following command: + * bin/hadoop --config confdir \ + * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \ + * + */ + public static void main(String[] args) throws Exception { + if (args.length != 3) { + System.out.println("Usage: test shortcircuit checksum threadCount"); + System.exit(1); + } + boolean shortcircuit = Boolean.valueOf(args[0]); + boolean checksum = Boolean.valueOf(args[1]); + int threadCount = Integer.valueOf(args[2]); + + // Setup create a file + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + checksum); + + //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test + int fileSize = 1000 * blockSize + 100; // File with 1000 blocks + final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize); + + // create a new file in home directory. Do not close it. + final Path file1 = new Path("filelocal.dat"); + final FileSystem fs = FileSystem.get(conf); + FSDataOutputStream stm = createFile(fs, file1, 1); + + stm.write(dataToWrite); + stm.close(); + + long start = System.currentTimeMillis(); + final int iteration = 20; + Thread[] threads = new Thread[threadCount]; + for (int i = 0; i < threadCount; i++) { + threads[i] = new Thread() { + public void run() { + for (int i = 0; i < iteration; i++) { + try { + checkFileContent(fs, file1, dataToWrite, 0); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + }; + } + for (int i = 0; i < threadCount; i++) { + threads[i].start(); + } + for (int i = 0; i < threadCount; i++) { + threads[i].join(); + } + long end = System.currentTimeMillis(); + System.out.println("Iteration " + iteration + " took " + (end - start)); + fs.delete(file1, false); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 2fc86ddad5..9dbcc2f736 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice; @@ -991,4 +992,10 @@ public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary) } return r; } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) + throws IOException { + throw new IOException("getBlockLocalPathInfo not supported."); + } }