HDFS-15655. Add option to make balancer prefer to get cold blocks. Contributed by Yang Yun.
This commit is contained in:
parent
3ecd3628d4
commit
2aea43bf4f
@ -53,7 +53,7 @@ public RouterNamenodeProtocol(RouterRpcServer server) {
|
||||
|
||||
@Override
|
||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
|
||||
long minBlockSize) throws IOException {
|
||||
long minBlockSize, long hotBlockTimeInterval) throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.READ);
|
||||
|
||||
// Get the namespace where the datanode is located
|
||||
@ -78,9 +78,9 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
|
||||
// Forward to the proper namenode
|
||||
if (nsId != null) {
|
||||
RemoteMethod method = new RemoteMethod(
|
||||
NamenodeProtocol.class, "getBlocks",
|
||||
new Class<?>[] {DatanodeInfo.class, long.class, long.class},
|
||||
datanode, size, minBlockSize);
|
||||
NamenodeProtocol.class, "getBlocks", new Class<?>[]
|
||||
{DatanodeInfo.class, long.class, long.class, long.class},
|
||||
datanode, size, minBlockSize, hotBlockTimeInterval);
|
||||
return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
|
||||
}
|
||||
return null;
|
||||
|
@ -1490,8 +1490,9 @@ public void satisfyStoragePolicy(String path) throws IOException {
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
|
||||
long minBlockSize) throws IOException {
|
||||
return nnProto.getBlocks(datanode, size, minBlockSize);
|
||||
long minBlockSize, long hotBlockTimeInterval) throws IOException {
|
||||
return nnProto.getBlocks(datanode, size, minBlockSize,
|
||||
hotBlockTimeInterval);
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
|
@ -1350,9 +1350,9 @@ public void testProxyGetBlocks() throws Exception {
|
||||
|
||||
// Verify that checking that datanode works
|
||||
BlocksWithLocations routerBlockLocations =
|
||||
routerNamenodeProtocol.getBlocks(dn0, 1024, 0);
|
||||
routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
|
||||
BlocksWithLocations nnBlockLocations =
|
||||
nnNamenodeProtocol.getBlocks(dn0, 1024, 0);
|
||||
nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
|
||||
BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
|
||||
BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
|
||||
assertEquals(nnBlocks.length, routerBlocks.length);
|
||||
|
@ -713,6 +713,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final long DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB
|
||||
public static final String DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size";
|
||||
public static final long DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB
|
||||
public static final String DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_KEY =
|
||||
"dfs.balancer.getBlocks.hot-time-interval";
|
||||
public static final long DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_DEFAULT =
|
||||
0;
|
||||
public static final String DFS_BALANCER_KEYTAB_ENABLED_KEY = "dfs.balancer.keytab.enabled";
|
||||
public static final boolean DFS_BALANCER_KEYTAB_ENABLED_DEFAULT = false;
|
||||
public static final String DFS_BALANCER_ADDRESS_KEY = "dfs.balancer.address";
|
||||
|
@ -89,7 +89,7 @@ public GetBlocksResponseProto getBlocks(RpcController unused,
|
||||
BlocksWithLocations blocks;
|
||||
try {
|
||||
blocks = impl.getBlocks(dnInfo, request.getSize(),
|
||||
request.getMinBlockSize());
|
||||
request.getMinBlockSize(), request.getTimeInterval());
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
@ -102,11 +102,11 @@ public Object getUnderlyingProxyObject() {
|
||||
|
||||
@Override
|
||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
||||
minBlockSize)
|
||||
minBlockSize, long timeInterval)
|
||||
throws IOException {
|
||||
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
|
||||
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
|
||||
.setMinBlockSize(minBlockSize).build();
|
||||
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build();
|
||||
try {
|
||||
return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)
|
||||
.getBlocks());
|
||||
|
@ -203,6 +203,7 @@ public class Balancer {
|
||||
+ "on over-utilized machines."
|
||||
+ "\n\t[-asService]\tRun as a long running service."
|
||||
+ "\n\t[-sortTopNodes]"
|
||||
+ "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks."
|
||||
+ "\tSort datanodes based on the utilization so "
|
||||
+ "that highly utilized datanodes get scheduled first.";
|
||||
|
||||
@ -315,6 +316,14 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
|
||||
final long maxIterationTime = conf.getLong(
|
||||
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
|
||||
/**
|
||||
* Balancer prefer to get blocks which are belong to the cold files
|
||||
* created before this time period.
|
||||
*/
|
||||
final long hotBlockTimeInterval = conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
// DataNode configuration parameters for balancing
|
||||
final int maxConcurrentMovesPerNode = getInt(conf,
|
||||
@ -329,7 +338,7 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
|
||||
p.getExcludedNodes(), movedWinWidth, moverThreads,
|
||||
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
|
||||
getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval,
|
||||
maxIterationTime, conf);
|
||||
maxIterationTime, hotBlockTimeInterval, conf);
|
||||
this.threshold = p.getThreshold();
|
||||
this.policy = p.getBalancingPolicy();
|
||||
this.sourceNodes = p.getSourceNodes();
|
||||
@ -990,6 +999,14 @@ static BalancerParameters parse(String[] args) {
|
||||
} else if ("-asService".equalsIgnoreCase(args[i])) {
|
||||
b.setRunAsService(true);
|
||||
LOG.info("Balancer will run as a long running service");
|
||||
} else if ("-hotBlockTimeInterval".equalsIgnoreCase(args[i])) {
|
||||
checkArgument(++i < args.length,
|
||||
"hotBlockTimeInterval value is missing: args = "
|
||||
+ Arrays.toString(args));
|
||||
long hotBlockTimeInterval = Long.parseLong(args[i]);
|
||||
LOG.info("Using a hotBlockTimeInterval of "
|
||||
+ hotBlockTimeInterval);
|
||||
b.setHotBlockTimeInterval(hotBlockTimeInterval);
|
||||
} else if ("-sortTopNodes".equalsIgnoreCase(args[i])) {
|
||||
b.setSortTopNodes(true);
|
||||
LOG.info("Balancer will sort nodes by" +
|
||||
|
@ -27,6 +27,7 @@ final class BalancerParameters {
|
||||
private final BalancingPolicy policy;
|
||||
private final double threshold;
|
||||
private final int maxIdleIteration;
|
||||
private final long hotBlockTimeInterval;
|
||||
/** Exclude the nodes in this set. */
|
||||
private final Set<String> excludedNodes;
|
||||
/** If empty, include any node; otherwise, include only these nodes. */
|
||||
@ -66,6 +67,7 @@ private BalancerParameters(Builder builder) {
|
||||
this.runDuringUpgrade = builder.runDuringUpgrade;
|
||||
this.runAsService = builder.runAsService;
|
||||
this.sortTopNodes = builder.sortTopNodes;
|
||||
this.hotBlockTimeInterval = builder.hotBlockTimeInterval;
|
||||
}
|
||||
|
||||
BalancingPolicy getBalancingPolicy() {
|
||||
@ -113,12 +115,13 @@ public String toString() {
|
||||
return String.format("%s.%s [%s," + " threshold = %s,"
|
||||
+ " max idle iteration = %s," + " #excluded nodes = %s,"
|
||||
+ " #included nodes = %s," + " #source nodes = %s,"
|
||||
+ " #blockpools = %s," + " run during upgrade = %s]"
|
||||
+ " #blockpools = %s," + " run during upgrade = %s,"
|
||||
+ " hot block time interval = %s]"
|
||||
+ " sort top nodes = %s",
|
||||
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
|
||||
threshold, maxIdleIteration, excludedNodes.size(),
|
||||
includedNodes.size(), sourceNodes.size(), blockpools.size(),
|
||||
runDuringUpgrade, sortTopNodes);
|
||||
runDuringUpgrade, sortTopNodes, hotBlockTimeInterval);
|
||||
}
|
||||
|
||||
static class Builder {
|
||||
@ -134,6 +137,7 @@ static class Builder {
|
||||
private boolean runDuringUpgrade = false;
|
||||
private boolean runAsService = false;
|
||||
private boolean sortTopNodes = false;
|
||||
private long hotBlockTimeInterval = 0;
|
||||
|
||||
Builder() {
|
||||
}
|
||||
@ -153,6 +157,11 @@ Builder setMaxIdleIteration(int m) {
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder setHotBlockTimeInterval(long t) {
|
||||
this.hotBlockTimeInterval = t;
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder setExcludedNodes(Set<String> nodes) {
|
||||
this.excludedNodes = nodes;
|
||||
return this;
|
||||
|
@ -128,6 +128,7 @@ public class Dispatcher {
|
||||
private final long getBlocksSize;
|
||||
private final long getBlocksMinBlockSize;
|
||||
private final long blockMoveTimeout;
|
||||
private final long hotBlockTimeInterval;
|
||||
/**
|
||||
* If no block can be moved out of a {@link Source} after this configured
|
||||
* amount of time, the Source should give up choosing the next possible move.
|
||||
@ -797,7 +798,8 @@ Iterator<DBlock> getBlockIterator() {
|
||||
private long getBlockList() throws IOException {
|
||||
final long size = Math.min(getBlocksSize, blocksToReceive);
|
||||
final BlocksWithLocations newBlksLocs =
|
||||
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
|
||||
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize,
|
||||
hotBlockTimeInterval);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
|
||||
@ -1011,14 +1013,15 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||
int maxNoMoveInterval, Configuration conf) {
|
||||
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
||||
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
||||
0L, 0L, 0, maxNoMoveInterval, -1, conf);
|
||||
0L, 0L, 0, maxNoMoveInterval, -1, 0, conf);
|
||||
}
|
||||
|
||||
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
||||
long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout,
|
||||
int maxNoMoveInterval, long maxIterationTime, Configuration conf) {
|
||||
int maxNoMoveInterval, long maxIterationTime, long hotBlockTimeInterval,
|
||||
Configuration conf) {
|
||||
this.nnc = nnc;
|
||||
this.excludedNodes = excludedNodes;
|
||||
this.includedNodes = includedNodes;
|
||||
@ -1034,6 +1037,7 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||
|
||||
this.getBlocksSize = getBlocksSize;
|
||||
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
|
||||
this.hotBlockTimeInterval = hotBlockTimeInterval;
|
||||
this.blockMoveTimeout = blockMoveTimeout;
|
||||
this.maxNoMoveInterval = maxNoMoveInterval;
|
||||
|
||||
|
@ -249,7 +249,7 @@ public URI getNameNodeUri() {
|
||||
|
||||
/** @return blocks with locations. */
|
||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
||||
minBlockSize) throws IOException {
|
||||
minBlockSize, long timeInterval) throws IOException {
|
||||
if (getBlocksRateLimiter != null) {
|
||||
getBlocksRateLimiter.acquire();
|
||||
}
|
||||
@ -284,7 +284,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
||||
} else {
|
||||
nnproxy = namenode;
|
||||
}
|
||||
return nnproxy.getBlocks(datanode, size, minBlockSize);
|
||||
return nnproxy.getBlocks(datanode, size, minBlockSize, timeInterval);
|
||||
} finally {
|
||||
if (isRequestStandby) {
|
||||
LOG.info("Request #getBlocks to Standby NameNode success.");
|
||||
|
@ -89,6 +89,7 @@
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||
@ -1635,9 +1636,23 @@ public boolean isSufficientlyReplicated(BlockInfo b) {
|
||||
return liveReplicas >= getDatanodeManager().getNumLiveDataNodes();
|
||||
}
|
||||
|
||||
private boolean isHotBlock(BlockInfo blockInfo, long time) {
|
||||
INodeFile iFile = (INodeFile)getBlockCollection(blockInfo);
|
||||
if(iFile == null) {
|
||||
return false;
|
||||
}
|
||||
if(iFile.isUnderConstruction()) {
|
||||
return true;
|
||||
}
|
||||
if (iFile.getAccessTime() > time || iFile.getModificationTime() > time) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Get all blocks with location information from a datanode. */
|
||||
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
|
||||
final long size, final long minBlockSize) throws
|
||||
final long size, final long minBlockSize, final long timeInterval) throws
|
||||
UnregisteredNodeException {
|
||||
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
|
||||
if (node == null) {
|
||||
@ -1655,15 +1670,21 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
|
||||
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
|
||||
Iterator<BlockInfo> iter = node.getBlockIterator(startBlock);
|
||||
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
|
||||
List<BlockInfo> pending = new ArrayList<BlockInfo>();
|
||||
long totalSize = 0;
|
||||
BlockInfo curBlock;
|
||||
long hotTimePos = Time.now() - timeInterval;
|
||||
while(totalSize<size && iter.hasNext()) {
|
||||
curBlock = iter.next();
|
||||
if(!curBlock.isComplete()) continue;
|
||||
if (curBlock.getNumBytes() < minBlockSize) {
|
||||
continue;
|
||||
}
|
||||
totalSize += addBlock(curBlock, results);
|
||||
if(timeInterval > 0 && isHotBlock(curBlock, hotTimePos)) {
|
||||
pending.add(curBlock);
|
||||
} else {
|
||||
totalSize += addBlock(curBlock, results);
|
||||
}
|
||||
}
|
||||
if(totalSize<size) {
|
||||
iter = node.getBlockIterator(); // start from the beginning
|
||||
@ -1673,10 +1694,19 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
|
||||
if (curBlock.getNumBytes() < minBlockSize) {
|
||||
continue;
|
||||
}
|
||||
totalSize += addBlock(curBlock, results);
|
||||
if(timeInterval > 0 && isHotBlock(curBlock, hotTimePos)) {
|
||||
pending.add(curBlock);
|
||||
} else {
|
||||
totalSize += addBlock(curBlock, results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if the cold block (access before timeInterval) is less than the
|
||||
// asked size, it will add the pending hot block in end of return list.
|
||||
for(int i = 0; i < pending.size() && totalSize < size; i++) {
|
||||
curBlock = pending.get(i);
|
||||
totalSize += addBlock(curBlock, results);
|
||||
}
|
||||
return new BlocksWithLocations(
|
||||
results.toArray(new BlockWithLocations[results.size()]));
|
||||
}
|
||||
|
@ -1893,13 +1893,13 @@ public boolean isInStandbyState() {
|
||||
* @param minimumBlockSize
|
||||
*/
|
||||
public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
|
||||
minimumBlockSize) throws IOException {
|
||||
minimumBlockSize, long timeInterval) throws IOException {
|
||||
checkOperation(OperationCategory.READ);
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
return getBlockManager().getBlocksWithLocations(datanode, size,
|
||||
minimumBlockSize);
|
||||
minimumBlockSize, timeInterval);
|
||||
} finally {
|
||||
readUnlock("getBlocks");
|
||||
}
|
||||
|
@ -651,7 +651,7 @@ private static UserGroupInformation getRemoteUser() throws IOException {
|
||||
/////////////////////////////////////////////////////
|
||||
@Override // NamenodeProtocol
|
||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
||||
minBlockSize)
|
||||
minBlockSize, long timeInterval)
|
||||
throws IOException {
|
||||
if(size <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
@ -664,7 +664,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
||||
checkNNStartup();
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
|
||||
return namesystem.getBlocks(datanode, size, minBlockSize);
|
||||
return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
|
@ -74,6 +74,8 @@ public interface NamenodeProtocol {
|
||||
* @param datanode a data node
|
||||
* @param size requested size
|
||||
* @param minBlockSize each block should be of this minimum Block Size
|
||||
* @param hotBlockTimeInterval prefer to get blocks which are belong to
|
||||
* the cold files accessed before the time interval
|
||||
* @return BlocksWithLocations a list of blocks & their locations
|
||||
* @throws IOException if size is less than or equal to 0 or
|
||||
datanode does not exist
|
||||
@ -81,7 +83,7 @@ public interface NamenodeProtocol {
|
||||
@Idempotent
|
||||
@ReadOnly
|
||||
BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
||||
minBlockSize) throws IOException;
|
||||
minBlockSize, long hotBlockTimeInterval) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the current block keys
|
||||
|
@ -47,6 +47,7 @@ message GetBlocksRequestProto {
|
||||
// cause problem during rolling upgrade, when balancers are upgraded later.
|
||||
// For more info refer HDFS-13356
|
||||
optional uint64 minBlockSize = 3 [default = 10485760];
|
||||
optional uint64 timeInterval = 4 [default = 0];
|
||||
}
|
||||
|
||||
|
||||
|
@ -6068,4 +6068,13 @@
|
||||
until capacity is balanced out.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.balancer.getBlocks.hot-time-interval</name>
|
||||
<value>0</value>
|
||||
<description>
|
||||
Balancer prefer moving cold blocks i.e blocks associated with files
|
||||
accessed or modified before the specified time interval.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
@ -301,6 +301,7 @@ Usage:
|
||||
| `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). |
|
||||
| `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. |
|
||||
| `-asService` | Run Balancer as a long running service. |
|
||||
| `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. |
|
||||
| `-h`\|`--help` | Display the tool usage and help information and exit. |
|
||||
|
||||
Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details.
|
||||
|
@ -238,26 +238,26 @@ public void testGetBlocks() throws Exception {
|
||||
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
|
||||
|
||||
// Should return all 13 blocks, as minBlockSize is not passed
|
||||
locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks();
|
||||
locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
|
||||
assertEquals(blkLocsSize, locs.length);
|
||||
|
||||
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
|
||||
assertEquals(locs[1].getStorageIDs().length, replicationFactor);
|
||||
|
||||
// Should return 12 blocks, as minBlockSize is blkSize
|
||||
locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize).getBlocks();
|
||||
locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0).getBlocks();
|
||||
assertEquals(blkLocsSize - 1, locs.length);
|
||||
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
|
||||
assertEquals(locs[1].getStorageIDs().length, replicationFactor);
|
||||
|
||||
// get blocks of size BlockSize from dataNodes[0]
|
||||
locs = namenode.getBlocks(dataNodes[0], blkSize,
|
||||
blkSize).getBlocks();
|
||||
blkSize, 0).getBlocks();
|
||||
assertEquals(locs.length, 1);
|
||||
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
|
||||
|
||||
// get blocks of size 1 from dataNodes[0]
|
||||
locs = namenode.getBlocks(dataNodes[0], 1, 1).getBlocks();
|
||||
locs = namenode.getBlocks(dataNodes[0], 1, 1, 0).getBlocks();
|
||||
assertEquals(locs.length, 1);
|
||||
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
|
||||
|
||||
@ -282,7 +282,7 @@ public void testGetBlocks() throws Exception {
|
||||
|
||||
// Namenode should refuse to provide block locations to the balancer
|
||||
// while in safemode.
|
||||
locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks();
|
||||
locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
|
||||
assertEquals(blkLocsSize, locs.length);
|
||||
assertFalse(fs.isInSafeMode());
|
||||
LOG.info("Entering safe mode");
|
||||
@ -309,7 +309,7 @@ private void getBlocksWithException(NamenodeProtocol namenode,
|
||||
|
||||
// Namenode should refuse should fail
|
||||
LambdaTestUtils.intercept(exClass,
|
||||
msg, () -> namenode.getBlocks(datanode, size, minBlkSize));
|
||||
msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -396,4 +396,76 @@ public void testBlockKey() {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
private boolean belongToFile(BlockWithLocations blockWithLocations,
|
||||
List<LocatedBlock> blocks) {
|
||||
for(LocatedBlock block : blocks) {
|
||||
if (block.getBlock().getLocalBlock().equals(
|
||||
blockWithLocations.getBlock())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* test GetBlocks with dfs.namenode.hot.block.interval.
|
||||
* Balancer prefer to get blocks which are belong to the cold files
|
||||
* created before this time period.
|
||||
*/
|
||||
@Test
|
||||
public void testGetBlocksWithHotBlockTimeInterval() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
final short repFactor = (short) 1;
|
||||
final int blockNum = 2;
|
||||
final int fileLen = BLOCK_SIZE * blockNum;
|
||||
final long hotInterval = 2000;
|
||||
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
|
||||
numDataNodes(repFactor).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
final DFSClient dfsclient = ((DistributedFileSystem) fs).getClient();
|
||||
|
||||
String fileOld = "/f.old";
|
||||
DFSTestUtil.createFile(fs, new Path(fileOld), fileLen, repFactor, 0);
|
||||
|
||||
List<LocatedBlock> locatedBlocksOld = dfsclient.getNamenode().
|
||||
getBlockLocations(fileOld, 0, fileLen).getLocatedBlocks();
|
||||
DatanodeInfo[] dataNodes = locatedBlocksOld.get(0).getLocations();
|
||||
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost",
|
||||
cluster.getNameNodePort());
|
||||
NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
|
||||
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
|
||||
|
||||
// make the file as old.
|
||||
dfsclient.getNamenode().setTimes(fileOld, 0, 0);
|
||||
|
||||
String fileNew = "/f.new";
|
||||
DFSTestUtil.createFile(fs, new Path(fileNew), fileLen, repFactor, 0);
|
||||
List<LocatedBlock> locatedBlocksNew = dfsclient.getNamenode()
|
||||
.getBlockLocations(fileNew, 0, fileLen).getLocatedBlocks();
|
||||
|
||||
BlockWithLocations[] locsAll = namenode.getBlocks(
|
||||
dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
|
||||
assertEquals(locsAll.length, 4);
|
||||
|
||||
for(int i = 0; i < blockNum; i++) {
|
||||
assertTrue(belongToFile(locsAll[i], locatedBlocksOld));
|
||||
}
|
||||
for(int i = blockNum; i < blockNum*2; i++) {
|
||||
assertTrue(belongToFile(locsAll[i], locatedBlocksNew));
|
||||
}
|
||||
|
||||
BlockWithLocations[] locs2 = namenode.getBlocks(
|
||||
dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
|
||||
for(int i = 0; i < 2; i++) {
|
||||
assertTrue(belongToFile(locs2[i], locatedBlocksOld));
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
@ -2170,7 +2170,8 @@ public BlocksWithLocations answer(InvocationOnMock invocation)
|
||||
endGetBlocksTime.getAndUpdate((curr) -> Math.max(curr, endTime));
|
||||
numGetBlocksCalls.incrementAndGet();
|
||||
return blk;
|
||||
}}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong());
|
||||
}}).when(fsnSpy).getBlocks(any(DatanodeID.class),
|
||||
anyLong(), anyLong(), anyLong());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -228,7 +228,7 @@ private void testBalancerWithObserver(boolean withObserverFailure)
|
||||
int expectedObserverIdx = withObserverFailure ? 3 : 2;
|
||||
int expectedCount = (i == expectedObserverIdx) ? 2 : 0;
|
||||
verify(namesystemSpies.get(i), times(expectedCount))
|
||||
.getBlocks(any(), anyLong(), anyLong());
|
||||
.getBlocks(any(), anyLong(), anyLong(), anyLong());
|
||||
}
|
||||
} finally {
|
||||
if (qjmhaCluster != null) {
|
||||
|
Loading…
Reference in New Issue
Block a user