HDFS-16499. [SPS]: Should not start indefinitely while another SPS process is running (#4058)
This commit is contained in:
parent
a237526988
commit
7f6a891f03
@ -38,6 +38,7 @@
|
|||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -96,20 +97,25 @@ private static void secureLogin(Configuration conf)
|
|||||||
socAddr.getHostName());
|
socAddr.getHostName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static NameNodeConnector getNameNodeConnector(Configuration conf)
|
public static NameNodeConnector getNameNodeConnector(Configuration conf)
|
||||||
throws IOException, InterruptedException {
|
throws InterruptedException {
|
||||||
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH;
|
final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH;
|
||||||
|
String serverName = ExternalStoragePolicySatisfier.class.getSimpleName();
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
final List<NameNodeConnector> nncs = NameNodeConnector
|
final List<NameNodeConnector> nncs = NameNodeConnector
|
||||||
.newNameNodeConnectors(namenodes,
|
.newNameNodeConnectors(namenodes,
|
||||||
ExternalStoragePolicySatisfier.class.getSimpleName(),
|
serverName,
|
||||||
externalSPSPathId, conf,
|
externalSPSPathId, conf,
|
||||||
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
|
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
|
||||||
return nncs.get(0);
|
return nncs.get(0);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to connect with namenode", e);
|
LOG.warn("Failed to connect with namenode", e);
|
||||||
|
if (e.getMessage().equals("Another " + serverName + " is running.")) {
|
||||||
|
ExitUtil.terminate(-1,
|
||||||
|
"Exit immediately because another " + serverName + " is running");
|
||||||
|
}
|
||||||
Thread.sleep(3000); // retry the connection after few secs
|
Thread.sleep(3000); // retry the connection after few secs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,6 +90,8 @@
|
|||||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -197,9 +199,24 @@ private void createCluster() throws IOException {
|
|||||||
writeContent(FILE);
|
writeContent(FILE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createCluster(boolean createMoverPath) throws IOException {
|
||||||
|
getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
||||||
|
setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
|
||||||
|
STORAGES_PER_DATANODE, CAPACITY, createMoverPath));
|
||||||
|
getFS();
|
||||||
|
writeContent(FILE);
|
||||||
|
}
|
||||||
|
|
||||||
private MiniDFSCluster startCluster(final Configuration conf,
|
private MiniDFSCluster startCluster(final Configuration conf,
|
||||||
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
|
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
|
||||||
long nodeCapacity) throws IOException {
|
long nodeCapacity) throws IOException {
|
||||||
|
return startCluster(conf, storageTypes, numberOfDatanodes, storagesPerDn,
|
||||||
|
nodeCapacity, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private MiniDFSCluster startCluster(final Configuration conf,
|
||||||
|
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
|
||||||
|
long nodeCapacity, boolean createMoverPath) throws IOException {
|
||||||
long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
|
long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
|
||||||
for (int i = 0; i < numberOfDatanodes; i++) {
|
for (int i = 0; i < numberOfDatanodes; i++) {
|
||||||
for (int j = 0; j < storagesPerDn; j++) {
|
for (int j = 0; j < storagesPerDn; j++) {
|
||||||
@ -212,7 +229,7 @@ private MiniDFSCluster startCluster(final Configuration conf,
|
|||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
nnc = DFSTestUtil.getNameNodeConnector(getConf(),
|
nnc = DFSTestUtil.getNameNodeConnector(getConf(),
|
||||||
HdfsServerConstants.MOVER_ID_PATH, 1, false);
|
HdfsServerConstants.MOVER_ID_PATH, 1, createMoverPath);
|
||||||
|
|
||||||
externalSps = new StoragePolicySatisfier(getConf());
|
externalSps = new StoragePolicySatisfier(getConf());
|
||||||
externalCtxt = new ExternalSPSContext(externalSps, nnc);
|
externalCtxt = new ExternalSPSContext(externalSps, nnc);
|
||||||
@ -428,6 +445,30 @@ public void testWhenStoragePolicySetToCOLD()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testInfiniteStartWhenAnotherSPSRunning()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Create cluster and create mover path when get NameNodeConnector.
|
||||||
|
createCluster(true);
|
||||||
|
|
||||||
|
// Disable system exit for assert.
|
||||||
|
ExitUtil.disableSystemExit();
|
||||||
|
|
||||||
|
// Get NameNodeConnector one more time to simulate starting other sps process.
|
||||||
|
// It should exit immediately when another sps is running.
|
||||||
|
LambdaTestUtils.intercept(ExitUtil.ExitException.class,
|
||||||
|
"Exit immediately because another ExternalStoragePolicySatisfier is running",
|
||||||
|
() -> ExternalStoragePolicySatisfier.getNameNodeConnector(config));
|
||||||
|
} finally {
|
||||||
|
// Reset first exit exception to avoid AssertionError in MiniDFSCluster#shutdown.
|
||||||
|
// This has no effect on functionality.
|
||||||
|
ExitUtil.resetFirstExitException();
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
|
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
|
||||||
// Change policy to COLD
|
// Change policy to COLD
|
||||||
dfs.setStoragePolicy(new Path(FILE), COLD);
|
dfs.setStoragePolicy(new Path(FILE), COLD);
|
||||||
|
Loading…
Reference in New Issue
Block a user