diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java index 15cdc6eb47..efbee7c030 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -38,6 +38,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,20 +97,25 @@ private static void secureLogin(Configuration conf) socAddr.getHostName()); } - private static NameNodeConnector getNameNodeConnector(Configuration conf) - throws IOException, InterruptedException { + public static NameNodeConnector getNameNodeConnector(Configuration conf) + throws InterruptedException { final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH; + String serverName = ExternalStoragePolicySatisfier.class.getSimpleName(); while (true) { try { final List nncs = NameNodeConnector .newNameNodeConnectors(namenodes, - ExternalStoragePolicySatisfier.class.getSimpleName(), + serverName, externalSPSPathId, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); return nncs.get(0); } catch (IOException e) { LOG.warn("Failed to connect with namenode", e); + if (e.getMessage().equals("Another " + serverName + " is running.")) { + ExitUtil.terminate(-1, + "Exit immediately because another " + serverName + " is running"); + } Thread.sleep(3000); // retry the connection after few secs } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 77922a0991..361d61d54e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -90,6 +90,8 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.ExitUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -197,9 +199,24 @@ private void createCluster() throws IOException { writeContent(FILE); } + private void createCluster(boolean createMoverPath) throws IOException { + getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, CAPACITY, createMoverPath)); + getFS(); + writeContent(FILE); + } + private MiniDFSCluster startCluster(final Configuration conf, StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, long nodeCapacity) throws IOException { + return startCluster(conf, storageTypes, numberOfDatanodes, storagesPerDn, + nodeCapacity, false); + } + + private MiniDFSCluster startCluster(final Configuration conf, + StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, + long nodeCapacity, boolean createMoverPath) throws IOException { long[][] capacities = new long[numberOfDatanodes][storagesPerDn]; for (int i = 0; i < numberOfDatanodes; i++) { for (int j = 0; j < storagesPerDn; j++) { @@ -212,7 +229,7 @@ private MiniDFSCluster startCluster(final Configuration conf, cluster.waitActive(); nnc = DFSTestUtil.getNameNodeConnector(getConf(), - HdfsServerConstants.MOVER_ID_PATH, 1, false); + HdfsServerConstants.MOVER_ID_PATH, 1, createMoverPath); externalSps = new StoragePolicySatisfier(getConf()); externalCtxt = new ExternalSPSContext(externalSps, nnc); @@ -428,6 +445,30 @@ public void testWhenStoragePolicySetToCOLD() } } + @Test(timeout = 300000) + public void testInfiniteStartWhenAnotherSPSRunning() + throws Exception { + + try { + // Create cluster and create mover path when get NameNodeConnector. + createCluster(true); + + // Disable system exit for assert. + ExitUtil.disableSystemExit(); + + // Get NameNodeConnector one more time to simulate starting other sps process. + // It should exit immediately when another sps is running. + LambdaTestUtils.intercept(ExitUtil.ExitException.class, + "Exit immediately because another ExternalStoragePolicySatisfier is running", + () -> ExternalStoragePolicySatisfier.getNameNodeConnector(config)); + } finally { + // Reset first exit exception to avoid AssertionError in MiniDFSCluster#shutdown. + // This has no effect on functionality. + ExitUtil.resetFirstExitException(); + shutdownCluster(); + } + } + private void doTestWhenStoragePolicySetToCOLD() throws Exception { // Change policy to COLD dfs.setStoragePolicy(new Path(FILE), COLD);