HDFS-4261. Fix bugs in Balancer that it does not terminate in some cases and it checks BlockPlacementPolicy instance incorrectly. Contributed by Junping Du
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1419192 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bcaba93941
commit
8f5bd5f1bc
@ -269,6 +269,10 @@ Trunk (Unreleased)
|
||||
HDFS-4260 Fix HDFS tests to set test dir to a valid HDFS path as opposed
|
||||
to the local build path (Chri Nauroth via Sanjay)
|
||||
|
||||
HDFS-4261. Fix bugs in Balancer that it does not terminate in some cases
|
||||
and it checks BlockPlacementPolicy instance incorrectly. (Junping Du via
|
||||
szetszwo)
|
||||
|
||||
Release 2.0.3-alpha - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -224,7 +224,6 @@ public class Balancer {
|
||||
= new HashMap<String, BalancerDatanode>();
|
||||
|
||||
private NetworkTopology cluster;
|
||||
|
||||
final static private int MOVER_THREAD_POOL_SIZE = 1000;
|
||||
final private ExecutorService moverExecutor =
|
||||
Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
|
||||
@ -801,8 +800,8 @@ private void dispatchBlocks() {
|
||||
*/
|
||||
private static void checkReplicationPolicyCompatibility(Configuration conf
|
||||
) throws UnsupportedActionException {
|
||||
if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof
|
||||
BlockPlacementPolicyDefault) {
|
||||
if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof
|
||||
BlockPlacementPolicyDefault)) {
|
||||
throw new UnsupportedActionException(
|
||||
"Balancer without BlockPlacementPolicyDefault");
|
||||
}
|
||||
@ -1085,7 +1084,6 @@ private long get() {
|
||||
}
|
||||
};
|
||||
private BytesMoved bytesMoved = new BytesMoved();
|
||||
private int notChangedIterations = 0;
|
||||
|
||||
/* Start a thread to dispatch block moves for each source.
|
||||
* The thread selects blocks to move & sends request to proxy source to
|
||||
@ -1384,15 +1382,8 @@ private ReturnStatus run(int iteration, Formatter formatter,
|
||||
* available to move.
|
||||
* Exit no byte has been moved for 5 consecutive iterations.
|
||||
*/
|
||||
if (dispatchBlockMoves() > 0) {
|
||||
notChangedIterations = 0;
|
||||
} else {
|
||||
notChangedIterations++;
|
||||
if (notChangedIterations >= 5) {
|
||||
System.out.println(
|
||||
"No block has been moved for 5 iterations. Exiting...");
|
||||
return ReturnStatus.NO_MOVE_PROGRESS;
|
||||
}
|
||||
if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
|
||||
return ReturnStatus.NO_MOVE_PROGRESS;
|
||||
}
|
||||
|
||||
// clean all lists
|
||||
|
@ -52,6 +52,7 @@
|
||||
class NameNodeConnector {
|
||||
private static final Log LOG = Balancer.LOG;
|
||||
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
||||
private static final int MAX_NOT_CHANGED_INTERATIONS = 5;
|
||||
|
||||
final URI nameNodeUri;
|
||||
final String blockpoolID;
|
||||
@ -65,6 +66,8 @@ class NameNodeConnector {
|
||||
private final boolean encryptDataTransfer;
|
||||
private boolean shouldRun;
|
||||
private long keyUpdaterInterval;
|
||||
// used for balancer
|
||||
private int notChangedIterations = 0;
|
||||
private BlockTokenSecretManager blockTokenSecretManager;
|
||||
private Daemon keyupdaterthread; // AccessKeyUpdater thread
|
||||
private DataEncryptionKey encryptionKey;
|
||||
@ -119,6 +122,20 @@ class NameNodeConnector {
|
||||
}
|
||||
}
|
||||
|
||||
boolean shouldContinue(long dispatchBlockMoveBytes) {
|
||||
if (dispatchBlockMoveBytes > 0) {
|
||||
notChangedIterations = 0;
|
||||
} else {
|
||||
notChangedIterations++;
|
||||
if (notChangedIterations >= MAX_NOT_CHANGED_INTERATIONS) {
|
||||
System.out.println("No block has been moved for "
|
||||
+ notChangedIterations + " iterations. Exiting...");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Get an access token for a block. */
|
||||
Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
|
||||
) throws IOException {
|
||||
|
@ -41,6 +41,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.junit.Test;
|
||||
import junit.framework.Assert;
|
||||
|
||||
/**
|
||||
* This class tests if a balancer schedules tasks correctly.
|
||||
@ -174,6 +175,19 @@ private void runBalancer(Configuration conf,
|
||||
LOG.info("Rebalancing with default factor.");
|
||||
waitForBalancer(totalUsedSpace, totalCapacity);
|
||||
}
|
||||
|
||||
private void runBalancerCanFinish(Configuration conf,
|
||||
long totalUsedSpace, long totalCapacity) throws Exception {
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||
|
||||
// start rebalancing
|
||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||
Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
|
||||
(r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||
LOG.info("Rebalancing with default factor.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a cluster with even distribution, and a new empty node is added to
|
||||
@ -289,4 +303,49 @@ public void testBalancerWithNodeGroup() throws Exception {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
|
||||
* in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster
|
||||
* to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
|
||||
* to replica placement policy with NodeGroup. As a result, n2 and n3 will be
|
||||
* filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
|
||||
* to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
|
||||
* to end in 5 iterations without move block process.
|
||||
*/
|
||||
@Test
|
||||
public void testBalancerEndInNoMoveProgress() throws Exception {
|
||||
Configuration conf = createConf();
|
||||
long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
|
||||
String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
|
||||
String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
|
||||
|
||||
int numOfDatanodes = capacities.length;
|
||||
assertEquals(numOfDatanodes, racks.length);
|
||||
assertEquals(numOfDatanodes, nodeGroups.length);
|
||||
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(capacities.length)
|
||||
.racks(racks)
|
||||
.simulatedCapacities(capacities);
|
||||
MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
|
||||
cluster = new MiniDFSClusterWithNodeGroup(builder);
|
||||
try {
|
||||
cluster.waitActive();
|
||||
client = NameNodeProxies.createProxy(conf,
|
||||
cluster.getFileSystem(0).getUri(),
|
||||
ClientProtocol.class).getProxy();
|
||||
|
||||
long totalCapacity = TestBalancer.sum(capacities);
|
||||
// fill up the cluster to be 60% full
|
||||
long totalUsedSpace = totalCapacity * 6 / 10;
|
||||
TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3,
|
||||
(short) (3), 0);
|
||||
|
||||
// run balancer which can finish in 5 iterations with no block movement.
|
||||
runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
|
||||
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user