HDFS-17146.Use the dfsadmin -reconfig command to initiate reconfiguration on all decommissioning datanodes. (#6504) Contributed by Hualong Zhang.
Reviewed-by: Shilun Fan <slfan1989@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
parent
12498b35bb
commit
9751b6e41a
@ -35,6 +35,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -453,7 +454,8 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep
|
|||||||
"\t[-refreshSuperUserGroupsConfiguration]\n" +
|
"\t[-refreshSuperUserGroupsConfiguration]\n" +
|
||||||
"\t[-refreshCallQueue]\n" +
|
"\t[-refreshCallQueue]\n" +
|
||||||
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
|
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
|
||||||
"\t[-reconfig <namenode|datanode> <host:ipc_port|livenodes> <start|status|properties>]\n" +
|
"\t[-reconfig <namenode|datanode> <host:ipc_port|livenodes|decomnodes>\n" +
|
||||||
|
"\t<start|status|properties>]\n" +
|
||||||
"\t[-printTopology]\n" +
|
"\t[-printTopology]\n" +
|
||||||
"\t[-refreshNamenodes datanode_host:ipc_port]\n" +
|
"\t[-refreshNamenodes datanode_host:ipc_port]\n" +
|
||||||
"\t[-getVolumeReport datanode_host:ipc_port]\n" +
|
"\t[-getVolumeReport datanode_host:ipc_port]\n" +
|
||||||
@ -1251,14 +1253,15 @@ private void printHelp(String cmd) {
|
|||||||
|
|
||||||
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
|
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
|
||||||
|
|
||||||
String reconfig = "-reconfig <namenode|datanode> <host:ipc_port|livenodes> " +
|
String reconfig = "-reconfig <namenode|datanode> <host:ipc_port|livenodes|decomnodes> " +
|
||||||
"<start|status|properties>:\n" +
|
"<start|status|properties>:\n" +
|
||||||
"\tStarts or gets the status of a reconfiguration operation, \n" +
|
"\tStarts or gets the status of a reconfiguration operation, \n" +
|
||||||
"\tor gets a list of reconfigurable properties.\n" +
|
"\tor gets a list of reconfigurable properties.\n" +
|
||||||
"\tThe second parameter specifies the node type\n" +
|
"\tThe second parameter specifies the node type\n" +
|
||||||
"\tThe third parameter specifies host address. For start or status, \n" +
|
"\tThe third parameter specifies host address. For start or status, \n" +
|
||||||
"\tdatanode supports livenodes as third parameter, which will start \n" +
|
"\tdatanode supports livenodes and decomnodes as the third parameter, \n" +
|
||||||
"\tor retrieve reconfiguration on all live datanodes.";
|
"\twhich will start or retrieve reconfiguration on all live " +
|
||||||
|
"\tor decommissioning datanodes. \n";
|
||||||
String genericRefresh = "-refresh: Arguments are <hostname:ipc_port>" +
|
String genericRefresh = "-refresh: Arguments are <hostname:ipc_port>" +
|
||||||
" <resource_identifier> [arg1..argn]\n" +
|
" <resource_identifier> [arg1..argn]\n" +
|
||||||
"\tTriggers a runtime-refresh of the resource specified by " +
|
"\tTriggers a runtime-refresh of the resource specified by " +
|
||||||
@ -1928,14 +1931,14 @@ public int reconfig(String[] argv, int i) throws IOException, InterruptedExcepti
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int startReconfiguration(final String nodeThpe, final String address)
|
int startReconfiguration(final String nodeType, final String address)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
return startReconfigurationUtil(nodeThpe, address, System.out, System.err);
|
return startReconfigurationUtil(nodeType, address, System.out, System.err);
|
||||||
}
|
}
|
||||||
|
|
||||||
int startReconfigurationUtil(final String nodeType, final String address, final PrintStream out,
|
int startReconfigurationUtil(final String nodeType, final String address, final PrintStream out,
|
||||||
final PrintStream err) throws IOException, InterruptedException {
|
final PrintStream err) throws IOException, InterruptedException {
|
||||||
if (!"livenodes".equals(address)) {
|
if (!"livenodes".equals(address) && !"decomnodes".equals(address)) {
|
||||||
return startReconfiguration(nodeType, address, out, err);
|
return startReconfiguration(nodeType, address, out, err);
|
||||||
}
|
}
|
||||||
if (!"datanode".equals(nodeType)) {
|
if (!"datanode".equals(nodeType)) {
|
||||||
@ -1944,23 +1947,28 @@ int startReconfigurationUtil(final String nodeType, final String address, final
|
|||||||
}
|
}
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(5);
|
ExecutorService executorService = Executors.newFixedThreadPool(5);
|
||||||
DistributedFileSystem dfs = getDFS();
|
DistributedFileSystem dfs = getDFS();
|
||||||
DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
|
final DatanodeInfo[] nodes = "livenodes".equals(address) ?
|
||||||
|
dfs.getDataNodeStats(DatanodeReportType.LIVE) :
|
||||||
|
dfs.getDataNodeStats(DatanodeReportType.DECOMMISSIONING);
|
||||||
AtomicInteger successCount = new AtomicInteger();
|
AtomicInteger successCount = new AtomicInteger();
|
||||||
AtomicInteger failCount = new AtomicInteger();
|
AtomicInteger failCount = new AtomicInteger();
|
||||||
if (nodes != null) {
|
if (nodes != null) {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(nodes.length);
|
||||||
for (DatanodeInfo node : nodes) {
|
for (DatanodeInfo node : nodes) {
|
||||||
executorService.submit(() -> {
|
executorService.submit(() -> {
|
||||||
|
try {
|
||||||
int status = startReconfiguration(nodeType, node.getIpcAddr(false), out, err);
|
int status = startReconfiguration(nodeType, node.getIpcAddr(false), out, err);
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
successCount.incrementAndGet();
|
successCount.incrementAndGet();
|
||||||
} else {
|
} else {
|
||||||
failCount.incrementAndGet();
|
failCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
while ((successCount.get() + failCount.get()) < nodes.length) {
|
latch.await();
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
|
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||||
err.println("Executor service could not be terminated in 60s. Please wait for"
|
err.println("Executor service could not be terminated in 60s. Please wait for"
|
||||||
@ -2022,7 +2030,7 @@ int startReconfigurationDispatch(final String nodeType,
|
|||||||
|
|
||||||
int getReconfigurationStatusUtil(final String nodeType, final String address,
|
int getReconfigurationStatusUtil(final String nodeType, final String address,
|
||||||
final PrintStream out, final PrintStream err) throws IOException, InterruptedException {
|
final PrintStream out, final PrintStream err) throws IOException, InterruptedException {
|
||||||
if (!"livenodes".equals(address)) {
|
if (!"livenodes".equals(address) && !"decomnodes".equals(address)) {
|
||||||
return getReconfigurationStatus(nodeType, address, out, err);
|
return getReconfigurationStatus(nodeType, address, out, err);
|
||||||
}
|
}
|
||||||
if (!"datanode".equals(nodeType)) {
|
if (!"datanode".equals(nodeType)) {
|
||||||
@ -2031,23 +2039,28 @@ int getReconfigurationStatusUtil(final String nodeType, final String address,
|
|||||||
}
|
}
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(5);
|
ExecutorService executorService = Executors.newFixedThreadPool(5);
|
||||||
DistributedFileSystem dfs = getDFS();
|
DistributedFileSystem dfs = getDFS();
|
||||||
DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
|
final DatanodeInfo[] nodes = "livenodes".equals(address) ?
|
||||||
|
dfs.getDataNodeStats(DatanodeReportType.LIVE) :
|
||||||
|
dfs.getDataNodeStats(DatanodeReportType.DECOMMISSIONING);
|
||||||
AtomicInteger successCount = new AtomicInteger();
|
AtomicInteger successCount = new AtomicInteger();
|
||||||
AtomicInteger failCount = new AtomicInteger();
|
AtomicInteger failCount = new AtomicInteger();
|
||||||
if (nodes != null) {
|
if (nodes != null) {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(nodes.length);
|
||||||
for (DatanodeInfo node : nodes) {
|
for (DatanodeInfo node : nodes) {
|
||||||
executorService.submit(() -> {
|
executorService.submit(() -> {
|
||||||
|
try {
|
||||||
int status = getReconfigurationStatus(nodeType, node.getIpcAddr(false), out, err);
|
int status = getReconfigurationStatus(nodeType, node.getIpcAddr(false), out, err);
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
successCount.incrementAndGet();
|
successCount.incrementAndGet();
|
||||||
} else {
|
} else {
|
||||||
failCount.incrementAndGet();
|
failCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
while ((successCount.get() + failCount.get()) < nodes.length) {
|
latch.await();
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
|
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||||
err.println("Executor service could not be terminated in 60s. Please wait for"
|
err.println("Executor service could not be terminated in 60s. Please wait for"
|
||||||
@ -2312,7 +2325,7 @@ private static void printUsage(String cmd) {
|
|||||||
+ " [-refreshCallQueue]");
|
+ " [-refreshCallQueue]");
|
||||||
} else if ("-reconfig".equals(cmd)) {
|
} else if ("-reconfig".equals(cmd)) {
|
||||||
System.err.println("Usage: hdfs dfsadmin"
|
System.err.println("Usage: hdfs dfsadmin"
|
||||||
+ " [-reconfig <namenode|datanode> <host:ipc_port|livenodes> "
|
+ " [-reconfig <namenode|datanode> <host:ipc_port|livenodes|decomnodes> "
|
||||||
+ "<start|status|properties>]");
|
+ "<start|status|properties>]");
|
||||||
} else if ("-refresh".equals(cmd)) {
|
} else if ("-refresh".equals(cmd)) {
|
||||||
System.err.println("Usage: hdfs dfsadmin"
|
System.err.println("Usage: hdfs dfsadmin"
|
||||||
|
@ -380,7 +380,7 @@ Usage:
|
|||||||
hdfs dfsadmin [-refreshSuperUserGroupsConfiguration]
|
hdfs dfsadmin [-refreshSuperUserGroupsConfiguration]
|
||||||
hdfs dfsadmin [-refreshCallQueue]
|
hdfs dfsadmin [-refreshCallQueue]
|
||||||
hdfs dfsadmin [-refresh <host:ipc_port> <key> [arg1..argn]]
|
hdfs dfsadmin [-refresh <host:ipc_port> <key> [arg1..argn]]
|
||||||
hdfs dfsadmin [-reconfig <namenode|datanode> <host:ipc_port|livenodes> <start |status |properties>]
|
hdfs dfsadmin [-reconfig <namenode|datanode> <host:ipc_port|livenodes|decomnodes> <start |status |properties>]
|
||||||
hdfs dfsadmin [-printTopology]
|
hdfs dfsadmin [-printTopology]
|
||||||
hdfs dfsadmin [-refreshNamenodes datanodehost:port]
|
hdfs dfsadmin [-refreshNamenodes datanodehost:port]
|
||||||
hdfs dfsadmin [-getVolumeReport datanodehost:port]
|
hdfs dfsadmin [-getVolumeReport datanodehost:port]
|
||||||
@ -419,7 +419,7 @@ Usage:
|
|||||||
| `-refreshSuperUserGroupsConfiguration` | Refresh superuser proxy groups mappings |
|
| `-refreshSuperUserGroupsConfiguration` | Refresh superuser proxy groups mappings |
|
||||||
| `-refreshCallQueue` | Reload the call queue from config. |
|
| `-refreshCallQueue` | Reload the call queue from config. |
|
||||||
| `-refresh` \<host:ipc\_port\> \<key\> [arg1..argn] | Triggers a runtime-refresh of the resource specified by \<key\> on \<host:ipc\_port\>. All other args after are sent to the host. |
|
| `-refresh` \<host:ipc\_port\> \<key\> [arg1..argn] | Triggers a runtime-refresh of the resource specified by \<key\> on \<host:ipc\_port\>. All other args after are sent to the host. |
|
||||||
| `-reconfig` \<datanode \|namenode\> \<host:ipc\_port\|livenodes> \<start\|status\|properties\> | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. The third parameter specifies host address. For start or status, datanode supports livenodes as third parameter, which will start or retrieve reconfiguration on all live datanodes. |
|
| `-reconfig` \<datanode \|namenode\> \<host:ipc\_port\|livenodes\|decomnodes\> \<start\|status\|properties\> | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. The third parameter specifies host address. For start or status, datanode supports livenodes and decomnodes as the third parameter, which will start or retrieve reconfiguration on all live or decommissioning datanodes. |
|
||||||
| `-printTopology` | Print a tree of the racks and their nodes as reported by the Namenode |
|
| `-printTopology` | Print a tree of the racks and their nodes as reported by the Namenode |
|
||||||
| `-refreshNamenodes` datanodehost:port | For the given datanode, reloads the configuration files, stops serving the removed block-pools and starts serving new block-pools. |
|
| `-refreshNamenodes` datanodehost:port | For the given datanode, reloads the configuration files, stops serving the removed block-pools and starts serving new block-pools. |
|
||||||
| `-getVolumeReport` datanodehost:port | For the given datanode, get the volume report. |
|
| `-getVolumeReport` datanodehost:port | For the given datanode, get the volume report. |
|
||||||
|
@ -352,12 +352,14 @@ Datanode supports hot swappable drives. The user can add or replace HDFS data vo
|
|||||||
the reconfiguration process. The user can use
|
the reconfiguration process. The user can use
|
||||||
`dfsadmin -reconfig datanode HOST:PORT status`
|
`dfsadmin -reconfig datanode HOST:PORT status`
|
||||||
to query the running status of the reconfiguration task. In place of
|
to query the running status of the reconfiguration task. In place of
|
||||||
HOST:PORT, we can also specify livenodes for datanode. It would allow
|
Specifying `livenodes` would allow start or query reconfiguration
|
||||||
start or query reconfiguration on all live datanodes, whereas specifying
|
on all live datanodes, while `decomnodes` would target decommissioning datanodes.
|
||||||
HOST:PORT would only allow start or query of reconfiguration on the
|
Specifying HOST:PORT would only allow start or query of reconfiguration on the
|
||||||
particular datanode represented by HOST:PORT. The examples for livenodes
|
particular datanode represented by HOST:PORT.
|
||||||
queries are `dfsadmin -reconfig datanode livenodes start` and
|
Examples for `livenodes` queries are `dfsadmin -reconfig datanode livenodes start` and
|
||||||
`dfsadmin -reconfig datanode livenodes status`.
|
`dfsadmin -reconfig datanode livenodes status`.
|
||||||
|
For `decomnodes`, the commands are `dfsadmin -reconfig datanode decomnodes start`
|
||||||
|
and `dfsadmin -reconfig datanode decomnodes status`.
|
||||||
|
|
||||||
* Once the reconfiguration task has completed, the user can safely `umount`
|
* Once the reconfiguration task has completed, the user can safely `umount`
|
||||||
the removed data volume directories and physically remove the disks.
|
the removed data volume directories and physically remove the disks.
|
||||||
|
@ -46,6 +46,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.text.TextStringBuilder;
|
import org.apache.commons.text.TextStringBuilder;
|
||||||
@ -96,6 +97,7 @@
|
|||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -1283,4 +1285,100 @@ public void testAllDatanodesReconfig()
|
|||||||
Assertions.assertThat(outs.subList(1, 5)).containsSubsequence(success, from, to);
|
Assertions.assertThat(outs.subList(1, 5)).containsSubsequence(success, from, to);
|
||||||
Assertions.assertThat(outs.subList(5, 9)).containsSubsequence(success, from, to, retrieval);
|
Assertions.assertThat(outs.subList(5, 9)).containsSubsequence(success, from, to, retrieval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDecommissionDataNodesReconfig()
|
||||||
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
|
redirectStream();
|
||||||
|
final Configuration dfsConf = new HdfsConfiguration();
|
||||||
|
try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(dfsConf)
|
||||||
|
.numDataNodes(3).build()) {
|
||||||
|
ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class);
|
||||||
|
miniCluster.getDataNodes().forEach(node -> node.setReconfigurationUtil(reconfigurationUtil));
|
||||||
|
List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>();
|
||||||
|
changes.add(new ReconfigurationUtil.PropertyChange(
|
||||||
|
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, "1000",
|
||||||
|
datanode.getConf().get(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY)));
|
||||||
|
when(reconfigurationUtil.parseChangedProperties(any(Configuration.class),
|
||||||
|
any(Configuration.class))).thenReturn(changes);
|
||||||
|
|
||||||
|
DFSAdmin dfsAdmin = Mockito.spy(new DFSAdmin(dfsConf));
|
||||||
|
DistributedFileSystem dfs = Mockito.spy(miniCluster.getFileSystem());
|
||||||
|
DatanodeInfo decommissioningNode1 = dfs.getDataNodeStats()[0];
|
||||||
|
DatanodeInfo decommissioningNode2 = dfs.getDataNodeStats()[1];
|
||||||
|
DatanodeInfo[] dataNodeStats = new DatanodeInfo[]{decommissioningNode1, decommissioningNode2};
|
||||||
|
when(dfsAdmin.getDFS()).thenReturn(dfs);
|
||||||
|
when(dfs.getDataNodeStats(DatanodeReportType.DECOMMISSIONING)).thenReturn(dataNodeStats);
|
||||||
|
|
||||||
|
int ret = dfsAdmin.startReconfiguration("datanode", "decomnodes");
|
||||||
|
|
||||||
|
// collect outputs
|
||||||
|
final List<String> outsForStartReconf = Lists.newArrayList();
|
||||||
|
final List<String> errsForStartReconf = Lists.newArrayList();
|
||||||
|
scanIntoList(out, outsForStartReconf);
|
||||||
|
scanIntoList(err, errsForStartReconf);
|
||||||
|
|
||||||
|
// verify startReconfiguration results is as expected
|
||||||
|
assertEquals(0, ret);
|
||||||
|
String started = "Started reconfiguration task on node";
|
||||||
|
String starting =
|
||||||
|
"Starting of reconfiguration task successful on 2 nodes, failed on 0 nodes.";
|
||||||
|
Assertions.assertThat(outsForStartReconf).hasSize(3);
|
||||||
|
Assertions.assertThat(errsForStartReconf).hasSize(0);
|
||||||
|
Assertions.assertThat(outsForStartReconf.get(0)).startsWith(started);
|
||||||
|
Assertions.assertThat(outsForStartReconf.get(1)).startsWith(started);
|
||||||
|
Assertions.assertThat(outsForStartReconf.get(2)).startsWith(starting);
|
||||||
|
|
||||||
|
// verify getReconfigurationStatus results is as expected
|
||||||
|
Thread.sleep(1000);
|
||||||
|
resetStream();
|
||||||
|
final List<String> outsForFinishReconf = Lists.newArrayList();
|
||||||
|
final List<String> errsForFinishReconf = Lists.newArrayList();
|
||||||
|
waitForReconfigurationDecommissionNode("datanode", "decomnodes",
|
||||||
|
dfsAdmin, outsForFinishReconf, errsForFinishReconf);
|
||||||
|
String success = "SUCCESS: Changed property " +
|
||||||
|
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
|
||||||
|
String from = "\tFrom: \"0\"";
|
||||||
|
String to = "\tTo: \"1000\"";
|
||||||
|
String retrieval =
|
||||||
|
"Retrieval of reconfiguration status successful on 2 nodes, failed on 0 nodes.";
|
||||||
|
|
||||||
|
Assertions.assertThat(outsForFinishReconf.subList(1, 5)).
|
||||||
|
containsSubsequence(success, from, to);
|
||||||
|
Assertions.assertThat(outsForFinishReconf.subList(5, 9)).
|
||||||
|
containsSubsequence(success, from, to, retrieval);
|
||||||
|
|
||||||
|
// verify refreshed decommissioningNode is as expected
|
||||||
|
String node1Addr = decommissioningNode1.getIpAddr() + ":" +
|
||||||
|
decommissioningNode1.getIpcPort();
|
||||||
|
String node2Addr = decommissioningNode2.getIpAddr() + ":" +
|
||||||
|
decommissioningNode2.getIpcPort();
|
||||||
|
assertTrue(outsForFinishReconf.get(0).contains(node1Addr)
|
||||||
|
&& outsForFinishReconf.get(0).contains(node2Addr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForReconfigurationDecommissionNode(final String nodeType, final String address,
|
||||||
|
DFSAdmin dfsAdmin, List<String> outs, List<String> errs)
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
PrintStream outStream = new PrintStream(out);
|
||||||
|
PrintStream errStream = new PrintStream(err);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
LocatedBlocks blocks = null;
|
||||||
|
try {
|
||||||
|
dfsAdmin.getReconfigurationStatusUtil("datanode", "decomnodes",
|
||||||
|
outStream, errStream);
|
||||||
|
} catch (IOException | InterruptedException e) {
|
||||||
|
LOG.error(String.format(
|
||||||
|
"call getReconfigurationStatus on %s[%s] failed.", nodeType,
|
||||||
|
address), e);
|
||||||
|
}
|
||||||
|
scanIntoList(out, outs);
|
||||||
|
scanIntoList(err, errs);
|
||||||
|
return !outs.isEmpty() && outs.get(0).contains("finished");
|
||||||
|
}
|
||||||
|
}, 100, 100 * 100);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user