HDFS-14442. Disagreement between HAUtil.getAddressOfActive and RpcInvocationHandler.getConnectionId. Contributed by Ravuri Sushma sree.
This commit is contained in:
parent
5ead9c15ca
commit
f736408a83
@ -43,6 +43,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
|
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
@ -258,14 +259,29 @@ public static boolean useLogicalUri(Configuration conf, URI nameNodeUri)
|
|||||||
*/
|
*/
|
||||||
public static InetSocketAddress getAddressOfActive(FileSystem fs)
|
public static InetSocketAddress getAddressOfActive(FileSystem fs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
InetSocketAddress inAddr = null;
|
||||||
if (!(fs instanceof DistributedFileSystem)) {
|
if (!(fs instanceof DistributedFileSystem)) {
|
||||||
throw new IllegalArgumentException("FileSystem " + fs + " is not a DFS.");
|
throw new IllegalArgumentException("FileSystem " + fs + " is not a DFS.");
|
||||||
}
|
}
|
||||||
// force client address resolution.
|
// force client address resolution.
|
||||||
fs.exists(new Path("/"));
|
fs.exists(new Path("/"));
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
DFSClient dfsClient = dfs.getClient();
|
Configuration dfsConf = dfs.getConf();
|
||||||
return RPC.getServerAddress(dfsClient.getNamenode());
|
URI dfsUri = dfs.getUri();
|
||||||
|
String nsId = dfsUri.getHost();
|
||||||
|
if (isHAEnabled(dfsConf, nsId)) {
|
||||||
|
List<ClientProtocol> namenodes =
|
||||||
|
getProxiesForAllNameNodesInNameservice(dfsConf, nsId);
|
||||||
|
for (ClientProtocol proxy : namenodes) {
|
||||||
|
if (proxy.getHAServiceState().equals(HAServiceState.ACTIVE)) {
|
||||||
|
inAddr = RPC.getServerAddress(proxy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DFSClient dfsClient = dfs.getClient();
|
||||||
|
inAddr = RPC.getServerAddress(dfsClient.getNamenode());
|
||||||
|
}
|
||||||
|
return inAddr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,8 +33,8 @@
|
|||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
@ -42,6 +42,7 @@
|
|||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.ha.ServiceFailedException;
|
import org.apache.hadoop.ha.ServiceFailedException;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
@ -52,6 +53,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
||||||
import org.apache.hadoop.hdfs.tools.GetGroups;
|
import org.apache.hadoop.hdfs.tools.GetGroups;
|
||||||
@ -88,7 +90,7 @@ public static void startUpCluster() throws Exception {
|
|||||||
// Observer and immediately try to read from it.
|
// Observer and immediately try to read from it.
|
||||||
conf.setTimeDuration(
|
conf.setTimeDuration(
|
||||||
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
|
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
|
||||||
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, true);
|
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true);
|
||||||
dfsCluster = qjmhaCluster.getDfsCluster();
|
dfsCluster = qjmhaCluster.getDfsCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -469,6 +471,25 @@ public void testStickyActive() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsckDelete() throws Exception {
|
||||||
|
setObserverRead(true);
|
||||||
|
DFSTestUtil.createFile(dfs, testPath, 512, (short) 1, 0);
|
||||||
|
DFSTestUtil.waitForReplication(dfs, testPath, (short) 1, 5000);
|
||||||
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, testPath);
|
||||||
|
int dnToCorrupt = DFSTestUtil.firstDnWithBlock(dfsCluster, block);
|
||||||
|
FSNamesystem ns = dfsCluster.getNameNode(0).getNamesystem();
|
||||||
|
// corrupt Replicas are detected on restarting datanode
|
||||||
|
dfsCluster.corruptReplica(dnToCorrupt, block);
|
||||||
|
dfsCluster.restartDataNode(dnToCorrupt);
|
||||||
|
DFSTestUtil.waitCorruptReplicas(dfs, ns, testPath, block, 1);
|
||||||
|
final String result = TestFsck.runFsck(conf, 1, true, "/", "-delete");
|
||||||
|
// filesystem should be in corrupt state
|
||||||
|
LOG.info("result=" + result);
|
||||||
|
assertTrue(result.contains("The filesystem under path '/' is CORRUPT"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void assertSentTo(int nnIdx) throws IOException {
|
private void assertSentTo(int nnIdx) throws IOException {
|
||||||
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
||||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
||||||
|
Loading…
Reference in New Issue
Block a user