diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index fd63b698c9..79275b0cdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -258,14 +259,29 @@ public static boolean useLogicalUri(Configuration conf, URI nameNodeUri) */ public static InetSocketAddress getAddressOfActive(FileSystem fs) throws IOException { + InetSocketAddress inAddr = null; if (!(fs instanceof DistributedFileSystem)) { throw new IllegalArgumentException("FileSystem " + fs + " is not a DFS."); } // force client address resolution. fs.exists(new Path("/")); DistributedFileSystem dfs = (DistributedFileSystem) fs; - DFSClient dfsClient = dfs.getClient(); - return RPC.getServerAddress(dfsClient.getNamenode()); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + String nsId = dfsUri.getHost(); + if (isHAEnabled(dfsConf, nsId)) { + List namenodes = + getProxiesForAllNameNodesInNameservice(dfsConf, nsId); + for (ClientProtocol proxy : namenodes) { + if (proxy.getHAServiceState().equals(HAServiceState.ACTIVE)) { + inAddr = RPC.getServerAddress(proxy); + } + } + } else { + DFSClient dfsClient = dfs.getClient(); + inAddr = RPC.getServerAddress(dfsClient.getNamenode()); + } + return inAddr; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index d11093f1d6..ed05abacf1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -33,8 +33,8 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; - import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; @@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; @@ -88,7 +90,7 @@ public static void startUpCluster() throws Exception { // Observer and immediately try to read from it. conf.setTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); - qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true); + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true); dfsCluster = qjmhaCluster.getDfsCluster(); } @@ -469,6 +471,25 @@ public void testStickyActive() throws Exception { } } + @Test + public void testFsckDelete() throws Exception { + setObserverRead(true); + DFSTestUtil.createFile(dfs, testPath, 512, (short) 1, 0); + DFSTestUtil.waitForReplication(dfs, testPath, (short) 1, 5000); + ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, testPath); + int dnToCorrupt = DFSTestUtil.firstDnWithBlock(dfsCluster, block); + FSNamesystem ns = dfsCluster.getNameNode(0).getNamesystem(); + // corrupt Replicas are detected on restarting datanode + dfsCluster.corruptReplica(dnToCorrupt, block); + dfsCluster.restartDataNode(dnToCorrupt); + DFSTestUtil.waitCorruptReplicas(dfs, ns, testPath, block, 1); + final String result = TestFsck.runFsck(conf, 1, true, "/", "-delete"); + // filesystem should be in corrupt state + LOG.info("result=" + result); + assertTrue(result.contains("The filesystem under path '/' is CORRUPT")); + } + + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));