HDFS-16522. Set Http and Ipc ports for Datanodes in MiniDFSCluster (#4108)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
61bbdfd3a7
commit
7c20602b17
@ -203,6 +203,8 @@ public static class Builder {
|
|||||||
private int nameNodeHttpPort = 0;
|
private int nameNodeHttpPort = 0;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private int numDataNodes = 1;
|
private int numDataNodes = 1;
|
||||||
|
private int[] dnHttpPorts = null;
|
||||||
|
private int[] dnIpcPorts = null;
|
||||||
private StorageType[][] storageTypes = null;
|
private StorageType[][] storageTypes = null;
|
||||||
private StorageType[] storageTypes1D = null;
|
private StorageType[] storageTypes1D = null;
|
||||||
private int storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
private int storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
||||||
@ -277,6 +279,16 @@ public Builder numDataNodes(int val) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setDnHttpPorts(int... ports) {
|
||||||
|
this.dnHttpPorts = ports;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setDnIpcPorts(int... ports) {
|
||||||
|
this.dnIpcPorts = ports;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default: DEFAULT_STORAGES_PER_DATANODE
|
* Default: DEFAULT_STORAGES_PER_DATANODE
|
||||||
*/
|
*/
|
||||||
@ -599,7 +611,9 @@ protected MiniDFSCluster(Builder builder) throws IOException {
|
|||||||
builder.checkDataNodeHostConfig,
|
builder.checkDataNodeHostConfig,
|
||||||
builder.dnConfOverlays,
|
builder.dnConfOverlays,
|
||||||
builder.skipFsyncForTesting,
|
builder.skipFsyncForTesting,
|
||||||
builder.useConfiguredTopologyMappingClass);
|
builder.useConfiguredTopologyMappingClass,
|
||||||
|
builder.dnHttpPorts,
|
||||||
|
builder.dnIpcPorts);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DataNodeProperties {
|
public static class DataNodeProperties {
|
||||||
@ -873,7 +887,7 @@ public MiniDFSCluster(int nameNodePort,
|
|||||||
operation, null, racks, hosts,
|
operation, null, racks, hosts,
|
||||||
null, simulatedCapacities, null, true, false,
|
null, simulatedCapacities, null, true, false,
|
||||||
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0),
|
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0),
|
||||||
true, false, false, null, true, false);
|
true, false, false, null, true, false, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initMiniDFSCluster(
|
private void initMiniDFSCluster(
|
||||||
@ -891,7 +905,9 @@ private void initMiniDFSCluster(
|
|||||||
boolean checkDataNodeHostConfig,
|
boolean checkDataNodeHostConfig,
|
||||||
Configuration[] dnConfOverlays,
|
Configuration[] dnConfOverlays,
|
||||||
boolean skipFsyncForTesting,
|
boolean skipFsyncForTesting,
|
||||||
boolean useConfiguredTopologyMappingClass)
|
boolean useConfiguredTopologyMappingClass,
|
||||||
|
int[] dnHttpPorts,
|
||||||
|
int[] dnIpcPorts)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
@ -974,9 +990,9 @@ private void initMiniDFSCluster(
|
|||||||
|
|
||||||
// Start the DataNodes
|
// Start the DataNodes
|
||||||
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
|
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
|
||||||
dnStartOpt != null ? dnStartOpt : startOpt,
|
dnStartOpt != null ? dnStartOpt : startOpt, racks, hosts, storageCapacities,
|
||||||
racks, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
|
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig,
|
||||||
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
|
dnConfOverlays, dnHttpPorts, dnIpcPorts);
|
||||||
waitClusterUp();
|
waitClusterUp();
|
||||||
//make sure ProxyUsers uses the latest conf
|
//make sure ProxyUsers uses the latest conf
|
||||||
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
||||||
@ -1598,8 +1614,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
String[] racks, String[] hosts,
|
String[] racks, String[] hosts,
|
||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile) throws IOException {
|
boolean setupHostsFile) throws IOException {
|
||||||
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts, null,
|
||||||
null, simulatedCapacities, setupHostsFile, false, false, null);
|
simulatedCapacities, setupHostsFile, false, false, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
@ -1608,8 +1624,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig) throws IOException {
|
boolean checkDataNodeAddrConfig) throws IOException {
|
||||||
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts, null,
|
||||||
null, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
|
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1625,6 +1641,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
* @param conf the base configuration to use in starting the DataNodes. This
|
* @param conf the base configuration to use in starting the DataNodes. This
|
||||||
* will be modified as necessary.
|
* will be modified as necessary.
|
||||||
* @param numDataNodes Number of DataNodes to start; may be zero
|
* @param numDataNodes Number of DataNodes to start; may be zero
|
||||||
|
* @param storageTypes Storage Types for DataNodes.
|
||||||
* @param manageDfsDirs if true, the data directories for DataNodes will be
|
* @param manageDfsDirs if true, the data directories for DataNodes will be
|
||||||
* created and {@link DFSConfigKeys#DFS_DATANODE_DATA_DIR_KEY} will be
|
* created and {@link DFSConfigKeys#DFS_DATANODE_DATA_DIR_KEY} will be
|
||||||
* set in the conf
|
* set in the conf
|
||||||
@ -1632,13 +1649,16 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
* or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
|
* or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
|
||||||
* @param racks array of strings indicating the rack that each DataNode is on
|
* @param racks array of strings indicating the rack that each DataNode is on
|
||||||
* @param hosts array of strings indicating the hostnames for each DataNode
|
* @param hosts array of strings indicating the hostnames for each DataNode
|
||||||
|
* @param storageCapacities array of Storage Capacities to be used while testing.
|
||||||
* @param simulatedCapacities array of capacities of the simulated data nodes
|
* @param simulatedCapacities array of capacities of the simulated data nodes
|
||||||
* @param setupHostsFile add new nodes to dfs hosts files
|
* @param setupHostsFile add new nodes to dfs hosts files
|
||||||
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
|
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
|
||||||
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
|
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
|
||||||
* @param dnConfOverlays An array of {@link Configuration} objects that will overlay the
|
* @param dnConfOverlays An array of {@link Configuration} objects that will overlay the
|
||||||
* global MiniDFSCluster Configuration for the corresponding DataNode.
|
* global MiniDFSCluster Configuration for the corresponding DataNode.
|
||||||
* @throws IllegalStateException if NameNode has been shutdown
|
* @param dnHttpPorts An array of Http ports if present, to be used for DataNodes.
|
||||||
|
* @param dnIpcPorts An array of Ipc ports if present, to be used for DataNodes.
|
||||||
|
* @throws IOException If the DFS daemons experience some issues.
|
||||||
*/
|
*/
|
||||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
||||||
@ -1648,7 +1668,9 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig,
|
boolean checkDataNodeAddrConfig,
|
||||||
boolean checkDataNodeHostConfig,
|
boolean checkDataNodeHostConfig,
|
||||||
Configuration[] dnConfOverlays) throws IOException {
|
Configuration[] dnConfOverlays,
|
||||||
|
int[] dnHttpPorts,
|
||||||
|
int[] dnIpcPorts) throws IOException {
|
||||||
assert storageCapacities == null || simulatedCapacities == null;
|
assert storageCapacities == null || simulatedCapacities == null;
|
||||||
assert storageTypes == null || storageTypes.length == numDataNodes;
|
assert storageTypes == null || storageTypes.length == numDataNodes;
|
||||||
assert storageCapacities == null || storageCapacities.length == numDataNodes;
|
assert storageCapacities == null || storageCapacities.length == numDataNodes;
|
||||||
@ -1656,6 +1678,19 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
if (operation == StartupOption.RECOVER) {
|
if (operation == StartupOption.RECOVER) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dnHttpPorts != null && dnHttpPorts.length != numDataNodes) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Num of http ports (" + dnHttpPorts.length + ") should match num of DataNodes ("
|
||||||
|
+ numDataNodes + ")");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dnIpcPorts != null && dnIpcPorts.length != numDataNodes) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Num of ipc ports (" + dnIpcPorts.length + ") should match num of DataNodes ("
|
||||||
|
+ numDataNodes + ")");
|
||||||
|
}
|
||||||
|
|
||||||
if (checkDataNodeHostConfig) {
|
if (checkDataNodeHostConfig) {
|
||||||
conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
|
conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
|
||||||
} else {
|
} else {
|
||||||
@ -1711,7 +1746,15 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
dnConf.addResource(dnConfOverlays[i]);
|
dnConf.addResource(dnConfOverlays[i]);
|
||||||
}
|
}
|
||||||
// Set up datanode address
|
// Set up datanode address
|
||||||
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
|
int httpPort = 0;
|
||||||
|
int ipcPort = 0;
|
||||||
|
if(dnHttpPorts != null) {
|
||||||
|
httpPort = dnHttpPorts[i - curDatanodesNum];
|
||||||
|
}
|
||||||
|
if(dnIpcPorts != null) {
|
||||||
|
ipcPort = dnIpcPorts[i - curDatanodesNum];
|
||||||
|
}
|
||||||
|
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig, httpPort, ipcPort);
|
||||||
if (manageDfsDirs) {
|
if (manageDfsDirs) {
|
||||||
String dirs = makeDataNodeDirs(i, storageTypes == null ?
|
String dirs = makeDataNodeDirs(i, storageTypes == null ?
|
||||||
null : storageTypes[i - curDatanodesNum]);
|
null : storageTypes[i - curDatanodesNum]);
|
||||||
@ -3365,7 +3408,7 @@ public void setBlockRecoveryTimeout(long timeout) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
|
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig) throws IOException {
|
boolean checkDataNodeAddrConfig, int httpPort, int ipcPort) throws IOException {
|
||||||
if (setupHostsFile) {
|
if (setupHostsFile) {
|
||||||
String hostsFile = conf.get(DFS_HOSTS, "").trim();
|
String hostsFile = conf.get(DFS_HOSTS, "").trim();
|
||||||
if (hostsFile.length() == 0) {
|
if (hostsFile.length() == 0) {
|
||||||
@ -3388,11 +3431,11 @@ protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (checkDataNodeAddrConfig) {
|
if (checkDataNodeAddrConfig) {
|
||||||
conf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
|
conf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:" + httpPort);
|
||||||
conf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
|
conf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:" + ipcPort);
|
||||||
} else {
|
} else {
|
||||||
conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
|
conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:" + httpPort);
|
||||||
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
|
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:" + ipcPort);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
||||||
Configuration dnConf = new HdfsConfiguration(conf);
|
Configuration dnConf = new HdfsConfiguration(conf);
|
||||||
// Set up datanode address
|
// Set up datanode address
|
||||||
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
|
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig, 0, 0);
|
||||||
if (manageDfsDirs) {
|
if (manageDfsDirs) {
|
||||||
String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
|
String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
|
||||||
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
|
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
|
||||||
@ -235,7 +235,9 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig,
|
boolean checkDataNodeAddrConfig,
|
||||||
boolean checkDataNodeHostConfig,
|
boolean checkDataNodeHostConfig,
|
||||||
Configuration[] dnConfOverlays) throws IOException {
|
Configuration[] dnConfOverlays,
|
||||||
|
int[] dnHttpPorts,
|
||||||
|
int[] dnIpcPorts) throws IOException {
|
||||||
startDataNodes(conf, numDataNodes, storageTypes, manageDfsDirs, operation, racks,
|
startDataNodes(conf, numDataNodes, storageTypes, manageDfsDirs, operation, racks,
|
||||||
NODE_GROUPS, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
|
NODE_GROUPS, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
|
||||||
checkDataNodeAddrConfig, checkDataNodeHostConfig);
|
checkDataNodeAddrConfig, checkDataNodeHostConfig);
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
@ -26,6 +27,7 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
@ -38,9 +40,13 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.apache.hadoop.test.PathUtils;
|
import org.apache.hadoop.test.PathUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.util.Preconditions;
|
import org.apache.hadoop.util.Preconditions;
|
||||||
|
|
||||||
@ -52,6 +58,8 @@
|
|||||||
*/
|
*/
|
||||||
public class TestMiniDFSCluster {
|
public class TestMiniDFSCluster {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestMiniDFSCluster.class);
|
||||||
|
|
||||||
private static final String CLUSTER_1 = "cluster1";
|
private static final String CLUSTER_1 = "cluster1";
|
||||||
private static final String CLUSTER_2 = "cluster2";
|
private static final String CLUSTER_2 = "cluster2";
|
||||||
private static final String CLUSTER_3 = "cluster3";
|
private static final String CLUSTER_3 = "cluster3";
|
||||||
@ -319,4 +327,88 @@ public void testSetUpFederatedCluster() throws Exception {
|
|||||||
cluster.restartNameNode(1);
|
cluster.restartNameNode(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// There is a possibility that this test might fail if any other concurrently running
|
||||||
|
// test could bind same port as one of the ports returned by NetUtils.getFreeSocketPorts(6)
|
||||||
|
// before datanodes are started.
|
||||||
|
@Test
|
||||||
|
public void testStartStopWithPorts() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
"Num of http ports (1) should match num of DataNodes (3)",
|
||||||
|
"MiniJournalCluster port validation failed",
|
||||||
|
() -> {
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(3).setDnHttpPorts(8481).build();
|
||||||
|
});
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
"Num of ipc ports (2) should match num of DataNodes (1)",
|
||||||
|
"MiniJournalCluster port validation failed",
|
||||||
|
() -> {
|
||||||
|
new MiniDFSCluster.Builder(conf).setDnIpcPorts(8481, 8482).build();
|
||||||
|
});
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
"Num of ipc ports (1) should match num of DataNodes (3)",
|
||||||
|
"MiniJournalCluster port validation failed",
|
||||||
|
() -> {
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(3).setDnHttpPorts(800, 9000, 10000)
|
||||||
|
.setDnIpcPorts(8481).build();
|
||||||
|
});
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
"Num of http ports (4) should match num of DataNodes (3)",
|
||||||
|
"MiniJournalCluster port validation failed",
|
||||||
|
() -> {
|
||||||
|
new MiniDFSCluster.Builder(conf).setDnHttpPorts(800, 9000, 1000, 2000)
|
||||||
|
.setDnIpcPorts(8481, 8482, 8483).numDataNodes(3).build();
|
||||||
|
});
|
||||||
|
|
||||||
|
final Set<Integer> httpAndIpcPorts = NetUtils.getFreeSocketPorts(6);
|
||||||
|
LOG.info("Free socket ports: {}", httpAndIpcPorts);
|
||||||
|
|
||||||
|
assertThat(httpAndIpcPorts).doesNotContain(0);
|
||||||
|
|
||||||
|
final int[] httpPorts = new int[3];
|
||||||
|
final int[] ipcPorts = new int[3];
|
||||||
|
int httpPortIdx = 0;
|
||||||
|
int ipcPortIdx = 0;
|
||||||
|
for (Integer httpAndIpcPort : httpAndIpcPorts) {
|
||||||
|
if (httpPortIdx < 3) {
|
||||||
|
httpPorts[httpPortIdx++] = httpAndIpcPort;
|
||||||
|
} else {
|
||||||
|
ipcPorts[ipcPortIdx++] = httpAndIpcPort;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Http ports selected: {}", httpPorts);
|
||||||
|
LOG.info("Ipc ports selected: {}", ipcPorts);
|
||||||
|
|
||||||
|
try (MiniDFSCluster miniDfsCluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.setDnHttpPorts(httpPorts)
|
||||||
|
.setDnIpcPorts(ipcPorts)
|
||||||
|
.numDataNodes(3).build()) {
|
||||||
|
miniDfsCluster.waitActive();
|
||||||
|
|
||||||
|
assertEquals(httpPorts[0],
|
||||||
|
miniDfsCluster.getDataNode(ipcPorts[0]).getInfoPort());
|
||||||
|
assertEquals(httpPorts[1],
|
||||||
|
miniDfsCluster.getDataNode(ipcPorts[1]).getInfoPort());
|
||||||
|
assertEquals(httpPorts[2],
|
||||||
|
miniDfsCluster.getDataNode(ipcPorts[2]).getInfoPort());
|
||||||
|
|
||||||
|
assertEquals(ipcPorts[0],
|
||||||
|
miniDfsCluster.getDataNode(ipcPorts[0]).getIpcPort());
|
||||||
|
assertEquals(ipcPorts[1],
|
||||||
|
miniDfsCluster.getDataNode(ipcPorts[1]).getIpcPort());
|
||||||
|
assertEquals(ipcPorts[2],
|
||||||
|
miniDfsCluster.getDataNode(ipcPorts[2]).getIpcPort());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -262,7 +262,7 @@ public void testBalancerWithRamDisk() throws Exception {
|
|||||||
long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
|
long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
|
||||||
diskStorageLimit}};
|
diskStorageLimit}};
|
||||||
cluster.startDataNodes(conf, replicationFactor, storageTypes, true, null,
|
cluster.startDataNodes(conf, replicationFactor, storageTypes, true, null,
|
||||||
null, null, storageCapacities, null, false, false, false, null);
|
null, null, storageCapacities, null, false, false, false, null, null, null);
|
||||||
|
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
@ -949,7 +949,7 @@ public void testMoverWithStripedFile() throws Exception {
|
|||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
|
||||||
true, null, null, null,capacities, null, false, false, false, null);
|
true, null, null, null, capacities, null, false, false, false, null, null, null);
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
// move file to ARCHIVE
|
// move file to ARCHIVE
|
||||||
@ -982,7 +982,7 @@ public void testMoverWithStripedFile() throws Exception {
|
|||||||
{ StorageType.SSD, StorageType.DISK },
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
{ StorageType.SSD, StorageType.DISK },
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
{ StorageType.SSD, StorageType.DISK } },
|
{ StorageType.SSD, StorageType.DISK } },
|
||||||
true, null, null, null, capacities, null, false, false, false, null);
|
true, null, null, null, capacities, null, false, false, false, null, null, null);
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
// move file blocks to ONE_SSD policy
|
// move file blocks to ONE_SSD policy
|
||||||
@ -1372,7 +1372,7 @@ private void startAdditionalDNs(final Configuration conf,
|
|||||||
final MiniDFSCluster cluster) throws IOException {
|
final MiniDFSCluster cluster) throws IOException {
|
||||||
|
|
||||||
cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
|
cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
|
||||||
null, null, null, false, false, false, null);
|
null, null, null, false, false, false, null, null, null);
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,7 +186,7 @@ public void testMoverWithFullStripe() throws Exception {
|
|||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
|
||||||
true, null, null, null, capacities, null, false, false, false, null);
|
true, null, null, null, capacities, null, false, false, false, null, null, null);
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
// move file to ARCHIVE
|
// move file to ARCHIVE
|
||||||
@ -294,7 +294,7 @@ public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy()
|
|||||||
new StorageType[][]{
|
new StorageType[][]{
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
|
||||||
true, null, null, null, capacities, null, false, false, false, null);
|
true, null, null, null, capacities, null, false, false, false, null, null, null);
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
// Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE
|
// Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE
|
||||||
|
@ -1712,7 +1712,7 @@ private void startAdditionalDNs(final Configuration conf,
|
|||||||
}
|
}
|
||||||
|
|
||||||
cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
|
cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
|
||||||
null, capacities, null, false, false, false, null);
|
null, capacities, null, false, false, false, null, null, null);
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ public int run(String[] args) throws Exception {
|
|||||||
+ " block listing files; launching DataNodes accordingly.");
|
+ " block listing files; launching DataNodes accordingly.");
|
||||||
mc.startDataNodes(getConf(), blockListFiles.size(), null, false,
|
mc.startDataNodes(getConf(), blockListFiles.size(), null, false,
|
||||||
StartupOption.REGULAR, null, null, null, null, false, true, true,
|
StartupOption.REGULAR, null, null, null, null, false, true, true,
|
||||||
null);
|
null, null, null);
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = Time.monotonicNow();
|
||||||
System.out.println("Waiting for DataNodes to connect to NameNode and "
|
System.out.println("Waiting for DataNodes to connect to NameNode and "
|
||||||
+ "init storage directories.");
|
+ "init storage directories.");
|
||||||
|
Loading…
Reference in New Issue
Block a user