From fda1221c55101d97ac62e1ee4e3ddf9a915d5363 Mon Sep 17 00:00:00 2001 From: Brahma Reddy Battula Date: Tue, 19 Sep 2017 11:25:45 +0530 Subject: [PATCH] HDFS-11799. Introduce a config to allow setting up write pipeline with fewer nodes than replication factor. Contributed by Brahma Reddy Battula --- .../org/apache/hadoop/hdfs/DFSClient.java | 13 +- .../org/apache/hadoop/hdfs/DataStreamer.java | 31 +- .../hdfs/client/HdfsClientConfigKeys.java | 2 + .../src/main/resources/hdfs-default.xml | 17 + ...TestReplaceDatanodeFailureReplication.java | 291 ++++++++++++++++++ .../hadoop/tools/TestHdfsConfigFields.java | 4 +- 6 files changed, 354 insertions(+), 4 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 772049d35d..7e8e95ba15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -223,6 +223,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final String clientName; final SocketFactory socketFactory; final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; + final short dtpReplaceDatanodeOnFailureReplication; private final FileSystem.Statistics stats; private final URI namenodeUri; private final Random r = new Random(); @@ -305,7 +306,17 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); - + this.dtpReplaceDatanodeOnFailureReplication = (short) conf + .getInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Sets " + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION + " to " + + dtpReplaceDatanodeOnFailureReplication); + } this.ugi = UserGroupInformation.getCurrentUser(); this.namenodeUri = nameNodeUri; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 4eafca1164..99fa5f3e33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1384,7 +1384,36 @@ private void addDatanode2ExistingPipeline() throws IOException { setPipeline(lb); //find the new datanode - final int d = findNewDatanode(original); + final int d; + try { + d = findNewDatanode(original); + } catch (IOException ioe) { + // check the minimal number of nodes available to decide whether to + // continue the write. + + //if live block location datanodes is greater than or equal to + // HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + // MIN_REPLICATION threshold value, continue writing to the + // remaining nodes. Otherwise throw exception. + // + // If HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + // MIN_REPLICATION is set to 0 or less than zero, an exception will be + // thrown if a replacement could not be found. + + if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && nodes.length + >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { + DFSClient.LOG.warn( + "Failed to find a new datanode to add to the write pipeline, " + + " continue to write to the pipeline with " + nodes.length + + " nodes since it's no less than minimum replication: " + + dfsClient.dtpReplaceDatanodeOnFailureReplication + + " configured by " + + BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION + + ".", ioe); + return; + } + throw ioe; + } //transfer replica. pick a source from the original nodes final DatanodeInfo src = original[tried % original.length]; final DatanodeInfo[] targets = {nodes[d]}; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index e99b099935..97cb68bd59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -320,6 +320,8 @@ interface ReplaceDatanodeOnFailure { String POLICY_DEFAULT = "DEFAULT"; String BEST_EFFORT_KEY = PREFIX + "best-effort"; boolean BEST_EFFORT_DEFAULT = false; + String MIN_REPLICATION = PREFIX + "min-replication"; + short MIN_REPLICATION_DEFAULT = 0; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index af40a34ba7..9327a2c585 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -658,6 +658,23 @@ + + dfs.client.block.write.replace-datanode-on-failure.min-replication + 0 + + The minimum number of replications that are needed to not to fail + the write pipeline if new datanodes can not be found to replace + failed datanodes (could be due to network failure) in the write pipeline. + If the number of the remaining datanodes in the write pipeline is greater + than or equal to this property value, continue writing to the remaining nodes. + Otherwise throw exception. + + If this is set to 0, an exception will be thrown, when a replacement + can not be found. + See also dfs.client.block.write.replace-datanode-on-failure.policy + + + dfs.blockreport.intervalMsec 21600000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java new file mode 100644 index 0000000000..9591cb4347 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; +import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy; +import org.apache.hadoop.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Verify the behaviours of HdfsClientConfigKeys.BlockWrite. + * ReplaceDatanodeOnFailure.MIN_REPLICATION.if live block location datanodes is + * greater than or equal to + * 'dfs.client.block.write.replace-datanode-on-failure.min.replication' + * threshold value, if yes continue writing to the two remaining nodes. + * Otherwise it will throw exception. + *

+ * If this HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + * MIN_REPLICATION is set to 0 or less than zero, an exception will be thrown + * if a replacement could not be found. + */ +public class TestReplaceDatanodeFailureReplication { + static final Log LOG = LogFactory + .getLog(TestReplaceDatanodeFailureReplication.class); + + static final String DIR = + "/" + TestReplaceDatanodeFailureReplication.class.getSimpleName() + "/"; + static final short REPLICATION = 3; + final private static String RACK0 = "/rack0"; + + /** + * Test fail last datanode in the pipeline. + */ + @Test + public void testLastDatanodeFailureInPipeline() throws Exception { + testWriteFileAndVerifyAfterDNStop(2, 1, 10, false); + } + + /** + * Test fail first datanode in the pipeline. + */ + @Test + public void testFirstDatanodeFailureInPipeline() throws Exception { + testWriteFileAndVerifyAfterDNStop(2, 0, 10, false); + } + + /** + * Test fail all the datanodes except first in the pipeline. + */ + @Test + public void testWithOnlyFirstDatanodeIsAlive() throws Exception { + testWriteFileAndVerifyAfterDNStop(1, 1, 1, true); + } + + /** + * Test fail all the datanodes except lastnode in the pipeline. + */ + @Test + public void testWithOnlyLastDatanodeIsAlive() throws Exception { + testWriteFileAndVerifyAfterDNStop(1, 0, 1, true); + } + + /** + * Test when number of live nodes are less than the + * "dfs.client.block.write.replace-datanode-on-failure.min.replication". + */ + @Test + public void testLessNumberOfLiveDatanodesThanWriteReplaceDatanodeOnFailureRF() + throws Exception { + final MiniDFSCluster cluster = setupCluster(2); + + try { + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path dir = new Path(DIR); + + final SlowWriter[] slowwriters = new SlowWriter[1]; + for (int i = 1; i <= slowwriters.length; i++) { + // create slow writers in different speed + slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), + i * 200L); + } + + for (SlowWriter s : slowwriters) { + s.start(); + } + + // Let slow writers write something. + // Some of them are too slow and will be not yet started. + sleepSeconds(1); + + // stop an old datanode + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + // Let the slow writer writes a few more seconds + // Everyone should have written something. + sleepSeconds(20); + + // check replication and interrupt. + for (SlowWriter s : slowwriters) { + try { + s.out.getCurrentBlockReplication(); + Assert.fail( + "Must throw exception as failed to add a new datanode for write " + + "pipeline, minimum failure replication"); + } catch (IOException e) { + // expected + } + s.interruptRunning(); + } + + // close files + for (SlowWriter s : slowwriters) { + s.joinAndClose(); + } + + // Verify the file + verifyFileContent(fs, slowwriters); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private MiniDFSCluster setupCluster(int failRF) throws IOException { + final Configuration conf = new HdfsConfiguration(); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, failRF); + // always replace a datanode + ReplaceDatanodeOnFailure.write(Policy.ALWAYS, false, conf); + + final String[] racks = new String[REPLICATION]; + Arrays.fill(racks, RACK0); + return new MiniDFSCluster.Builder(conf).racks(racks) + .numDataNodes(REPLICATION).build(); + } + + private void testWriteFileAndVerifyAfterDNStop(int failRF, int dnindex, + int slowWrites, boolean failPipeLine) + throws IOException, InterruptedException, TimeoutException { + final MiniDFSCluster cluster = setupCluster(failRF); + try { + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path dir = new Path(DIR); + + final SlowWriter[] slowwriters = new SlowWriter[slowWrites]; + for (int i = 1; i <= slowwriters.length; i++) { + // create slow writers in different speed + slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), + i * 200L); + } + + for (SlowWriter s : slowwriters) { + s.start(); + } + + // Let slow writers write something. + // Some of them are too slow and will be not yet started. + sleepSeconds(3); + + // stop an datanode + cluster.stopDataNode(dnindex); + if (failPipeLine) { + cluster.stopDataNode(dnindex); + } + + // Let the slow writer writes a few more seconds + // Everyone should have written something. + sleepSeconds(5); + cluster.waitFirstBRCompleted(0, 10000); + // check replication and interrupt. + for (SlowWriter s : slowwriters) { + Assert.assertEquals(failRF, s.out.getCurrentBlockReplication()); + s.interruptRunning(); + } + + // close files + for (SlowWriter s : slowwriters) { + s.joinAndClose(); + } + + // Verify the file + verifyFileContent(fs, slowwriters); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void verifyFileContent(DistributedFileSystem fs, + SlowWriter[] slowwriters) throws IOException { + LOG.info("Verify the file"); + for (int i = 0; i < slowwriters.length; i++) { + LOG.info(slowwriters[i].filepath + ": length=" + fs + .getFileStatus(slowwriters[i].filepath).getLen()); + FSDataInputStream in = null; + try { + in = fs.open(slowwriters[i].filepath); + for (int j = 0, x;; j++) { + x = in.read(); + if ((x) != -1) { + Assert.assertEquals(j, x); + } else { + return; + } + } + } finally { + IOUtils.closeStream(in); + } + } + } + + static void sleepSeconds(final int waittime) throws InterruptedException { + LOG.info("Wait " + waittime + " seconds"); + Thread.sleep(waittime * 1000L); + } + + static class SlowWriter extends Thread { + private final Path filepath; + private final HdfsDataOutputStream out; + private final long sleepms; + private volatile boolean running = true; + + SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms) + throws IOException { + super(SlowWriter.class.getSimpleName() + ":" + filepath); + this.filepath = filepath; + this.out = (HdfsDataOutputStream) fs.create(filepath, REPLICATION); + this.sleepms = sleepms; + } + + @Override public void run() { + int i = 0; + try { + sleep(sleepms); + for (; running; i++) { + LOG.info(getName() + " writes " + i); + out.write(i); + out.hflush(); + sleep(sleepms); + } + } catch (InterruptedException e) { + LOG.info(getName() + " interrupted:" + e); + } catch (IOException e) { + throw new RuntimeException(getName(), e); + } finally { + LOG.info(getName() + " terminated: i=" + i); + } + } + + void interruptRunning() { + running = false; + interrupt(); + } + + void joinAndClose() throws InterruptedException { + LOG.info(getName() + " join and close"); + join(); + IOUtils.closeStream(out); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 233dc5aa51..47db565e37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -41,8 +41,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { public void initializeMemberVariables() { xmlFilename = new String("hdfs-default.xml"); configurationClasses = new Class[] { HdfsClientConfigKeys.class, - HdfsClientConfigKeys.StripedRead.class, - DFSConfigKeys.class}; + HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class, + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class }; // Set error modes errorIfMissingConfigProps = true;