diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 6823c1f4a4..b6b11ee139 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -72,10 +73,12 @@ import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; +import org.apache.hadoop.util.StopWatch; import org.apache.htrace.core.SpanId; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; @@ -357,12 +360,18 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException { int replicaNotFoundCount = locatedblock.getLocations().length; final DfsClientConf conf = dfsClient.getConf(); - for(DatanodeInfo datanode : locatedblock.getLocations()) { + final int timeout = conf.getSocketTimeout(); + LinkedList nodeList = new LinkedList( + Arrays.asList(locatedblock.getLocations())); + LinkedList retryList = new LinkedList(); + boolean isRetry = false; + StopWatch sw = new StopWatch(); + while (nodeList.size() > 0) { + DatanodeInfo datanode = nodeList.pop(); ClientDatanodeProtocol cdp = null; - try { cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode, - dfsClient.getConfiguration(), conf.getSocketTimeout(), + dfsClient.getConfiguration(), timeout, conf.isConnectToDnViaHostname(), locatedblock); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); @@ -370,15 +379,19 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException { if (n >= 0) { return n; } - } - catch(IOException ioe) { - if (ioe instanceof RemoteException && - (((RemoteException) ioe).unwrapRemoteException() instanceof - ReplicaNotFoundException)) { - // special case : replica might not be on the DN, treat as 0 length - replicaNotFoundCount--; + } catch (IOException ioe) { + if (ioe instanceof RemoteException) { + if (((RemoteException) ioe).unwrapRemoteException() instanceof + ReplicaNotFoundException) { + // replica is not on the DN. We will treat it as 0 length + // if no one actually has a replica. + replicaNotFoundCount--; + } else if (((RemoteException) ioe).unwrapRemoteException() instanceof + RetriableException) { + // add to the list to be retried if necessary. + retryList.add(datanode); + } } - DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}" + " for block {}", datanode, locatedblock.getBlock(), ioe); } finally { @@ -386,6 +399,30 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException { RPC.stopProxy(cdp); } } + + // Ran out of nodes, but there are retriable nodes. + if (nodeList.size() == 0 && retryList.size() > 0) { + nodeList.addAll(retryList); + retryList.clear(); + isRetry = true; + } + + if (isRetry) { + // start the stop watch if not already running. + if (!sw.isRunning()) { + sw.start(); + } + try { + Thread.sleep(500); // delay between retries. + } catch (InterruptedException e) { + throw new IOException("Interrupted while getting the length."); + } + } + + // see if we ran out of retry time + if (sw.isRunning() && sw.now(TimeUnit.MILLISECONDS) > timeout) { + break; + } } // Namenode told us about these locations, but none know about the replica diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c451ea6d67..887ddef6a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2581,6 +2581,8 @@ Release 2.7.3 - UNRELEASED HDFS-7163. WebHdfsFileSystem should retry reads according to the configured retry policy. (Eric Payne via kihwal) + HDFS-9574. Reduce client failures during datanode restart (kihwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8707065e97..22859ee3fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -507,6 +507,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version"; public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT"; public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class"; + public static final String DFS_DATANODE_BP_READY_TIMEOUT_KEY = "dfs.datanode.bp-ready.timeout"; + public static final long DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT = 20; public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable"; public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index b3cb48b595..0f84fc5350 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -52,6 +52,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -104,6 +106,8 @@ public class DNConf { final long maxLockedMemory; + private final long bpReadyTimeout; + // Allow LAZY_PERSIST writes from non-local clients? private final boolean allowNonLocalLazyPersist; @@ -210,6 +214,10 @@ public DNConf(Configuration conf) { this.allowNonLocalLazyPersist = conf.getBoolean( DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT); + + this.bpReadyTimeout = conf.getLong( + DFS_DATANODE_BP_READY_TIMEOUT_KEY, + DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT); } // We get minimumNameNodeVersion via a method so it can be mocked out in tests. @@ -322,4 +330,8 @@ public int getTransferSocketRecvBufferSize() { public int getTransferSocketSendBufferSize() { return transferSocketSendBufferSize; } + + public long getBpReadyTimeout() { + return bpReadyTimeout; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 241f1e5657..6cd47ae44d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1594,6 +1594,7 @@ public int getIpcPort() { @VisibleForTesting public DatanodeRegistration getDNRegistrationForBP(String bpid) throws IOException { + DataNodeFaultInjector.get().noRegistration(); BPOfferService bpos = blockPoolManager.get(bpid); if(bpos==null || bpos.bpRegistration==null) { throw new IOException("cannot find BPOfferService for bpid="+bpid); @@ -1721,7 +1722,6 @@ FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk, throw new ShortCircuitFdsUnsupportedException( fileDescriptorPassingDisabledReason); } - checkBlockToken(blk, token, BlockTokenIdentifier.AccessMode.READ); int blkVersion = CURRENT_BLOCK_FORMAT_VERSION; if (maxVersion < blkVersion) { throw new ShortCircuitFdsVersionException("Your client is too old " + @@ -2709,6 +2709,15 @@ public long getReplicaVisibleLength(final ExtendedBlock block) throws IOExceptio } private void checkReadAccess(final ExtendedBlock block) throws IOException { + // Make sure this node has registered for the block pool. + try { + getDNRegistrationForBP(block.getBlockPoolId()); + } catch (IOException e) { + // if it has not registered with the NN, throw an exception back. + throw new org.apache.hadoop.ipc.RetriableException( + "Datanode not registered. Try again later."); + } + if (isBlockTokenEnabled) { Set tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 46ec3ae359..0e3869429d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -49,4 +49,6 @@ public void sendShortCircuitShmResponse() throws IOException {} public boolean dropHeartbeatPacket() { return false; } + + public void noRegistration() throws IOException { } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 94ce6360b6..190e69c34a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -45,6 +45,7 @@ import java.nio.channels.ClosedChannelException; import java.security.MessageDigest; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -85,6 +86,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.StopWatch; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; @@ -298,6 +300,9 @@ public void requestShortCircuitFds(final ExtendedBlock blk, SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException { updateCurrentThreadName("Passing file descriptors for block " + blk); + DataOutputStream out = getBufferedOutputStream(); + checkAccess(out, true, blk, token, + Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); FileInputStream fis[] = null; SlotId registeredSlotId = null; @@ -326,9 +331,6 @@ public void requestShortCircuitFds(final ExtendedBlock blk, } catch (ShortCircuitFdsUnsupportedException e) { bld.setStatus(ERROR_UNSUPPORTED); bld.setMessage(e.getMessage()); - } catch (InvalidToken e) { - bld.setStatus(ERROR_ACCESS_TOKEN); - bld.setMessage(e.getMessage()); } catch (IOException e) { bld.setStatus(ERROR); bld.setMessage(e.getMessage()); @@ -516,9 +518,9 @@ public void readBlock(final ExtendedBlock block, final CachingStrategy cachingStrategy) throws IOException { previousOpClientName = clientName; long read = 0; + updateCurrentThreadName("Sending block " + block); OutputStream baseStream = getOutputStream(); - DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - baseStream, smallBufferSize)); + DataOutputStream out = getBufferedOutputStream(); checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ); @@ -534,7 +536,6 @@ public void readBlock(final ExtendedBlock block, : dnR + " Served block " + block + " to " + remoteAddress; - updateCurrentThreadName("Sending block " + block); try { try { blockSender = new BlockSender(block, blockOffset, length, @@ -630,6 +631,10 @@ public void writeBlock(final ExtendedBlock block, allowLazyPersist = allowLazyPersist && (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal()); long size = 0; + // reply to upstream datanode or client + final DataOutputStream replyOut = getBufferedOutputStream(); + checkAccess(replyOut, isClient, block, blockToken, + Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE); // check single target for transfer-RBW/Finalized if (isTransfer && targets.length > 0) { throw new IOException(stage + " does not support multiple targets " @@ -660,11 +665,6 @@ public void writeBlock(final ExtendedBlock block, LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress); - // reply to upstream datanode or client - final DataOutputStream replyOut = getBufferedOutputStream(); - checkAccess(replyOut, isClient, block, blockToken, - Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE); - DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target Socket mirrorSock = null; // socket to next target @@ -863,13 +863,13 @@ public void transferBlock(final ExtendedBlock blk, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { - checkAccess(socketOut, true, blk, blockToken, - Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); + checkAccess(out, true, blk, blockToken, + Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY); try { datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); @@ -923,6 +923,7 @@ private MD5Hash calcPartialBlockChecksum(ExtendedBlock block, @Override public void blockChecksum(final ExtendedBlock block, final Token blockToken) throws IOException { + updateCurrentThreadName("Getting checksum for block " + block); final DataOutputStream out = new DataOutputStream( getOutputStream()); checkAccess(out, true, block, blockToken, @@ -933,13 +934,11 @@ public void blockChecksum(final ExtendedBlock block, long visibleLength = datanode.data.getReplicaVisibleLength(block); boolean partialBlk = requestLength < visibleLength; - updateCurrentThreadName("Reading metadata for block " + block); final LengthInputStream metadataIn = datanode.data .getMetaDataInputStream(block); final DataInputStream checksumIn = new DataInputStream( new BufferedInputStream(metadataIn, ioFileBufferSize)); - updateCurrentThreadName("Getting checksum for block " + block); try { //read metadata file final BlockMetadataHeader header = BlockMetadataHeader @@ -987,21 +986,10 @@ public void blockChecksum(final ExtendedBlock block, public void copyBlock(final ExtendedBlock block, final Token blockToken) throws IOException { updateCurrentThreadName("Copying block " + block); - // Read in the header - if (datanode.isBlockTokenEnabled) { - try { - datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, - BlockTokenIdentifier.AccessMode.COPY); - } catch (InvalidToken e) { - LOG.warn("Invalid access token in request from " + remoteAddress - + " for OP_COPY_BLOCK for block " + block + " : " - + e.getLocalizedMessage()); - sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); - return; - } + DataOutputStream reply = getBufferedOutputStream(); + checkAccess(reply, true, block, blockToken, + Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY); - } - if (datanode.data.getPinning(block)) { String msg = "Not able to copy block " + block.getBlockId() + " " + "to " + peer.getRemoteAddressString() + " because it's pinned "; @@ -1019,7 +1007,6 @@ public void copyBlock(final ExtendedBlock block, } BlockSender blockSender = null; - DataOutputStream reply = null; boolean isOpSuccess = true; try { @@ -1027,10 +1014,7 @@ public void copyBlock(final ExtendedBlock block, blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, null, CachingStrategy.newDropBehind()); - // set up response stream OutputStream baseStream = getOutputStream(); - reply = new DataOutputStream(new BufferedOutputStream( - baseStream, smallBufferSize)); // send status first writeSuccessWithChecksumInfo(blockSender, reply); @@ -1074,20 +1058,9 @@ public void replaceBlock(final ExtendedBlock block, final String delHint, final DatanodeInfo proxySource) throws IOException { updateCurrentThreadName("Replacing block " + block + " from " + delHint); - - /* read header */ - if (datanode.isBlockTokenEnabled) { - try { - datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, - BlockTokenIdentifier.AccessMode.REPLACE); - } catch (InvalidToken e) { - LOG.warn("Invalid access token in request from " + remoteAddress - + " for OP_REPLACE_BLOCK for block " + block + " : " - + e.getLocalizedMessage()); - sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); - return; - } - } + DataOutputStream replyOut = new DataOutputStream(getOutputStream()); + checkAccess(replyOut, true, block, blockToken, + Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE); if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start String msg = "Not able to receive block " + block.getBlockId() + @@ -1104,7 +1077,6 @@ public void replaceBlock(final ExtendedBlock block, String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - DataOutputStream replyOut = new DataOutputStream(getOutputStream()); boolean IoeDuringCopyBlockOperation = false; try { // Move the block to different storage in the same datanode @@ -1296,11 +1268,52 @@ private void incrDatanodeNetworkErrors() { datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort); } + /** + * Wait until the BP is registered, upto the configured amount of time. + * Throws an exception if times out, which should fail the client request. + * @param the requested block + */ + void checkAndWaitForBP(final ExtendedBlock block) + throws IOException { + String bpId = block.getBlockPoolId(); + + // The registration is only missing in relatively short time window. + // Optimistically perform this first. + try { + datanode.getDNRegistrationForBP(bpId); + return; + } catch (IOException ioe) { + // not registered + } + + // retry + long bpReadyTimeout = dnConf.getBpReadyTimeout(); + StopWatch sw = new StopWatch(); + sw.start(); + while (sw.now(TimeUnit.SECONDS) <= bpReadyTimeout) { + try { + datanode.getDNRegistrationForBP(bpId); + return; + } catch (IOException ioe) { + // not registered + } + // sleep before trying again + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while serving request. Aborting."); + } + } + // failed to obtain registration. + throw new IOException("Not ready to serve the block pool, " + bpId + "."); + } + private void checkAccess(OutputStream out, final boolean reply, final ExtendedBlock blk, final Token t, final Op op, final BlockTokenIdentifier.AccessMode mode) throws IOException { + checkAndWaitForBP(blk); if (datanode.isBlockTokenEnabled) { if (LOG.isDebugEnabled()) { LOG.debug("Checking block access token for block '" + blk.getBlockId() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1fce33c3ef..397c67b948 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2721,4 +2721,14 @@ + + dfs.datanode.bp-ready.timeout + 20 + + The maximum wait time for datanode to be ready before failing the + received request. Setting this to 0 fails requests right away if the + datanode is not yet registered with the namenode. This wait time + reduces initial request failures after datanode restart. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java index d8a7188951..3af959c4c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.datatransfer.*; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.util.DataChecksum; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; @@ -162,17 +163,20 @@ private static Peer getMockPeer(PeerLocality locality) { return peer; } - private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) { + private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) + throws IOException { Configuration conf = new HdfsConfiguration(); conf.setBoolean( DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED); DNConf dnConf = new DNConf(conf); + DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class); DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class); DataNode mockDn = mock(DataNode.class); when(mockDn.getDnConf()).thenReturn(dnConf); when(mockDn.getConf()).thenReturn(conf); when(mockDn.getMetrics()).thenReturn(mockMetrics); + when(mockDn.getDNRegistrationForBP("Dummy-pool")).thenReturn(mockDnReg); return mockDn; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java index 8bbac9f2f0..40a3d9d744 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; @@ -146,4 +147,75 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt) private static FsDatasetImpl dataset(DataNode dn) { return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); } + + @Test + public void testWaitForRegistrationOnRestart() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY, 5); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); + + // This makes the datanode appear registered to the NN, but it won't be + // able to get to the saved dn reg internally. + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override + public void noRegistration() throws IOException { + throw new IOException("no reg found for testing"); + } + }; + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(dnFaultInjector); + MiniDFSCluster cluster = null; + long start = 0; + Path file = new Path("/reg"); + try { + int numDNs = 1; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + + start = System.currentTimeMillis(); + FileSystem fileSys = cluster.getFileSystem(); + try { + DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L); + // It is a bug if this does not fail. + throw new IOException("Did not fail!"); + } catch (org.apache.hadoop.ipc.RemoteException e) { + long elapsed = System.currentTimeMillis() - start; + // timers have at-least semantics, so it should be at least 5 seconds. + if (elapsed < 5000 || elapsed > 10000) { + throw new IOException(elapsed + " seconds passed.", e); + } + } + DataNodeFaultInjector.set(oldDnInjector); + // this should succeed now. + DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L); + + // turn it back to under-construction, so that the client calls + // getReplicaVisibleLength() rpc method against the datanode. + fileSys.append(file); + // back to simulating unregistered node. + DataNodeFaultInjector.set(dnFaultInjector); + byte[] buffer = new byte[8]; + start = System.currentTimeMillis(); + try { + fileSys.open(file).read(0L, buffer, 0, 1); + throw new IOException("Did not fail!"); + } catch (IOException e) { + long elapsed = System.currentTimeMillis() - start; + if (e.getMessage().contains("readBlockLength")) { + throw new IOException("Failed, but with unexpected exception:", e); + } + // timers have at-least semantics, so it should be at least 5 seconds. + if (elapsed < 5000 || elapsed > 10000) { + throw new IOException(elapsed + " seconds passed.", e); + } + } + DataNodeFaultInjector.set(oldDnInjector); + fileSys.open(file).read(0L, buffer, 0, 1); + } finally { + DataNodeFaultInjector.set(oldDnInjector); + if (cluster != null) { + cluster.shutdown(); + } + } + } }