HDFS-10999. Introduce separate stats for Replicated and Erasure Coded Blocks apart from the current Aggregated stats. (Manoj Govindassamy via lei)

This commit is contained in:
Lei Xu 2017-06-14 10:44:19 -07:00
parent 6ed54f3439
commit 999c8fcbef
35 changed files with 1867 additions and 369 deletions

View File

@ -1919,12 +1919,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/**
* Returns count of blocks with one of more replica missing.
* Returns aggregated count of blocks with less redundancy.
* @throws IOException
*/
public long getUnderReplicatedBlocksCount() throws IOException {
return getStateByIndex(ClientProtocol.
GET_STATS_UNDER_REPLICATED_IDX);
public long getLowRedundancyBlocksCount() throws IOException {
return getStateByIndex(ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX);
}
/**

View File

@ -1266,12 +1266,12 @@ public class DistributedFileSystem extends FileSystem {
}
/**
* Returns count of blocks with one of more replica missing.
* Returns aggregated count of blocks with less redundancy.
*
* @throws IOException
*/
public long getUnderReplicatedBlocksCount() throws IOException {
return dfs.getUnderReplicatedBlocksCount();
public long getLowRedundancyBlocksCount() throws IOException {
return dfs.getLowRedundancyBlocksCount();
}
/**

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Get statistics pertaining to blocks of type {@link BlockType#CONTIGUOUS}
* in the filesystem.
* <p>
* @see ClientProtocol#getBlocksStats()
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class BlocksStats {
private final long lowRedundancyBlocksStat;
private final long corruptBlocksStat;
private final long missingBlocksStat;
private final long missingReplicationOneBlocksStat;
private final long bytesInFutureBlocksStat;
private final long pendingDeletionBlocksStat;
public BlocksStats(long lowRedundancyBlocksStat,
long corruptBlocksStat, long missingBlocksStat,
long missingReplicationOneBlocksStat, long bytesInFutureBlocksStat,
long pendingDeletionBlocksStat) {
this.lowRedundancyBlocksStat = lowRedundancyBlocksStat;
this.corruptBlocksStat = corruptBlocksStat;
this.missingBlocksStat = missingBlocksStat;
this.missingReplicationOneBlocksStat = missingReplicationOneBlocksStat;
this.bytesInFutureBlocksStat = bytesInFutureBlocksStat;
this.pendingDeletionBlocksStat = pendingDeletionBlocksStat;
}
public long getLowRedundancyBlocksStat() {
return lowRedundancyBlocksStat;
}
public long getCorruptBlocksStat() {
return corruptBlocksStat;
}
public long getMissingReplicaBlocksStat() {
return missingBlocksStat;
}
public long getMissingReplicationOneBlocksStat() {
return missingReplicationOneBlocksStat;
}
public long getBytesInFutureBlocksStat() {
return bytesInFutureBlocksStat;
}
public long getPendingDeletionBlocksStat() {
return pendingDeletionBlocksStat;
}
@Override
public String toString() {
StringBuilder statsBuilder = new StringBuilder();
statsBuilder.append("ReplicatedBlocksStats=[")
.append("LowRedundancyBlocks=").append(getLowRedundancyBlocksStat())
.append(", CorruptBlocks=").append(getCorruptBlocksStat())
.append(", MissingReplicaBlocks=").append(getMissingReplicaBlocksStat())
.append(", MissingReplicationOneBlocks=").append(
getMissingReplicationOneBlocksStat())
.append(", BytesInFutureBlocks=").append(getBytesInFutureBlocksStat())
.append(", PendingDeletionBlocks=").append(
getPendingDeletionBlocksStat())
.append("]");
return statsBuilder.toString();
}
}

View File

@ -731,10 +731,19 @@ public interface ClientProtocol {
@Idempotent
boolean recoverLease(String src, String clientName) throws IOException;
/**
* Constants to index the array of aggregated stats returned by
* {@link #getStats()}.
*/
int GET_STATS_CAPACITY_IDX = 0;
int GET_STATS_USED_IDX = 1;
int GET_STATS_REMAINING_IDX = 2;
/**
* Use {@link #GET_STATS_LOW_REDUNDANCY_IDX} instead.
*/
@Deprecated
int GET_STATS_UNDER_REPLICATED_IDX = 3;
int GET_STATS_LOW_REDUNDANCY_IDX = 3;
int GET_STATS_CORRUPT_BLOCKS_IDX = 4;
int GET_STATS_MISSING_BLOCKS_IDX = 5;
int GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX = 6;
@ -743,26 +752,40 @@ public interface ClientProtocol {
int STATS_ARRAY_LENGTH = 9;
/**
* Get a set of statistics about the filesystem.
* Right now, only eight values are returned.
* Get an array of aggregated statistics combining blocks of both type
* {@link BlockType#CONTIGUOUS} and {@link BlockType#STRIPED} in the
* filesystem. Use public constants like {@link #GET_STATS_CAPACITY_IDX} in
* place of actual numbers to index into the array.
* <ul>
* <li> [0] contains the total storage capacity of the system, in bytes.</li>
* <li> [1] contains the total used space of the system, in bytes.</li>
* <li> [2] contains the available storage of the system, in bytes.</li>
* <li> [3] contains number of under replicated blocks in the system.</li>
* <li> [4] contains number of blocks with a corrupt replica. </li>
* <li> [3] contains number of low redundancy blocks in the system.</li>
* <li> [4] contains number of corrupt blocks. </li>
* <li> [5] contains number of blocks without any good replicas left. </li>
* <li> [6] contains number of blocks which have replication factor
* 1 and have lost the only replica. </li>
* <li> [7] contains number of bytes that are at risk for deletion. </li>
* <li> [7] contains number of bytes that are at risk for deletion. </li>
* <li> [8] contains number of pending deletion blocks. </li>
* </ul>
* Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of
* actual numbers to index into the array.
*/
@Idempotent
long[] getStats() throws IOException;
/**
* Get statistics pertaining to blocks of type {@link BlockType#CONTIGUOUS}
* in the filesystem.
*/
@Idempotent
BlocksStats getBlocksStats() throws IOException;
/**
* Get statistics pertaining to blocks of type {@link BlockType#STRIPED}
* in the filesystem.
*/
@Idempotent
ECBlockGroupsStats getECBlockGroupsStats() throws IOException;
/**
* Get a report on the system's current datanodes.
* One DatanodeInfo object is returned for each DataNode.

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Get statistics pertaining to blocks of type {@link BlockType#STRIPED}
* in the filesystem.
* <p>
* @see ClientProtocol#getECBlockGroupsStats()
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class ECBlockGroupsStats {
private final long lowRedundancyBlockGroupsStat;
private final long corruptBlockGroupsStat;
private final long missingBlockGroupsStat;
private final long bytesInFutureBlockGroupsStat;
private final long pendingDeletionBlockGroupsStat;
public ECBlockGroupsStats(long lowRedundancyBlockGroupsStat, long
corruptBlockGroupsStat, long missingBlockGroupsStat, long
bytesInFutureBlockGroupsStat, long pendingDeletionBlockGroupsStat) {
this.lowRedundancyBlockGroupsStat = lowRedundancyBlockGroupsStat;
this.corruptBlockGroupsStat = corruptBlockGroupsStat;
this.missingBlockGroupsStat = missingBlockGroupsStat;
this.bytesInFutureBlockGroupsStat = bytesInFutureBlockGroupsStat;
this.pendingDeletionBlockGroupsStat = pendingDeletionBlockGroupsStat;
}
public long getBytesInFutureBlockGroupsStat() {
return bytesInFutureBlockGroupsStat;
}
public long getCorruptBlockGroupsStat() {
return corruptBlockGroupsStat;
}
public long getLowRedundancyBlockGroupsStat() {
return lowRedundancyBlockGroupsStat;
}
public long getMissingBlockGroupsStat() {
return missingBlockGroupsStat;
}
public long getPendingDeletionBlockGroupsStat() {
return pendingDeletionBlockGroupsStat;
}
@Override
public String toString() {
StringBuilder statsBuilder = new StringBuilder();
statsBuilder.append("ECBlockGroupsStats=[")
.append("LowRedundancyBlockGroups=").append(
getLowRedundancyBlockGroupsStat())
.append(", CorruptBlockGroups=").append(getCorruptBlockGroupsStat())
.append(", MissingBlockGroups=").append(getMissingBlockGroupsStat())
.append(", BytesInFutureBlockGroups=").append(
getBytesInFutureBlockGroupsStat())
.append(", PendingDeletionBlockGroups=").append(
getPendingDeletionBlockGroupsStat())
.append("]");
return statsBuilder.toString();
}
}

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -70,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@ -114,6 +116,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFil
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupsStatsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsBlocksStatsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetResponseProto;
@ -228,6 +232,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
private final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST =
GetFsStatusRequestProto.newBuilder().build();
private final static GetFsBlocksStatsRequestProto
VOID_GET_FS_REPLICABLOCKS_STATS_REQUEST =
GetFsBlocksStatsRequestProto.newBuilder().build();
private final static GetFsECBlockGroupsStatsRequestProto
VOID_GET_FS_ECBLOCKGROUPS_STATS_REQUEST =
GetFsECBlockGroupsStatsRequestProto.newBuilder().build();
private final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST =
RollEditsRequestProto.getDefaultInstance();
@ -668,6 +680,26 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
}
@Override
public BlocksStats getBlocksStats() throws IOException {
try {
return PBHelperClient.convert(rpcProxy.getFsBlocksStats(null,
VOID_GET_FS_REPLICABLOCKS_STATS_REQUEST));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public ECBlockGroupsStats getECBlockGroupsStats() throws IOException {
try {
return PBHelperClient.convert(rpcProxy.getFsECBlockGroupsStats(null,
VOID_GET_FS_ECBLOCKGROUPS_STATS_REQUEST));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -89,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@ -115,6 +117,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Create
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsBlocksStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
@ -1717,6 +1721,21 @@ public class PBHelperClient {
return result;
}
public static BlocksStats convert(
GetFsBlocksStatsResponseProto res) {
return new BlocksStats(res.getLowRedundancy(),
res.getCorruptBlocks(), res.getMissingBlocks(),
res.getMissingReplOneBlocks(), res.getBlocksInFuture(),
res.getPendingDeletionBlocks());
}
public static ECBlockGroupsStats convert(
GetFsECBlockGroupsStatsResponseProto res) {
return new ECBlockGroupsStats(res.getLowRedundancy(),
res.getCorruptBlocks(), res.getMissingBlocks(),
res.getBlocksInFuture(), res.getPendingDeletionBlocks());
}
public static DatanodeReportTypeProto convert(DatanodeReportType t) {
switch (t) {
case ALL: return DatanodeReportTypeProto.ALL;
@ -2124,6 +2143,40 @@ public class PBHelperClient {
return result.build();
}
public static GetFsBlocksStatsResponseProto convert(
BlocksStats blocksStats) {
GetFsBlocksStatsResponseProto.Builder result =
GetFsBlocksStatsResponseProto.newBuilder();
result.setLowRedundancy(
blocksStats.getLowRedundancyBlocksStat());
result.setCorruptBlocks(
blocksStats.getCorruptBlocksStat());
result.setMissingBlocks(
blocksStats.getMissingReplicaBlocksStat());
result.setMissingReplOneBlocks(
blocksStats.getMissingReplicationOneBlocksStat());
result.setBlocksInFuture(
blocksStats.getBytesInFutureBlocksStat());
result.setPendingDeletionBlocks(
blocksStats.getPendingDeletionBlocksStat());
return result.build();
}
public static GetFsECBlockGroupsStatsResponseProto convert(
ECBlockGroupsStats ecBlockGroupsStats) {
GetFsECBlockGroupsStatsResponseProto.Builder result =
GetFsECBlockGroupsStatsResponseProto.newBuilder();
result.setLowRedundancy(
ecBlockGroupsStats.getLowRedundancyBlockGroupsStat());
result.setCorruptBlocks(ecBlockGroupsStats.getCorruptBlockGroupsStat());
result.setMissingBlocks(ecBlockGroupsStats.getMissingBlockGroupsStat());
result.setBlocksInFuture(
ecBlockGroupsStats.getBytesInFutureBlockGroupsStat());
result.setPendingDeletionBlocks(
ecBlockGroupsStats.getPendingDeletionBlockGroupsStat());
return result.build();
}
public static DatanodeReportType convert(DatanodeReportTypeProto t) {
switch (t) {
case ALL: return DatanodeReportType.ALL;

View File

@ -327,6 +327,29 @@ message GetFsStatsResponseProto {
optional uint64 pending_deletion_blocks = 9;
}
message GetFsBlocksStatsRequestProto { // no input paramters
}
message GetFsBlocksStatsResponseProto {
required uint64 low_redundancy = 1;
required uint64 corrupt_blocks = 2;
required uint64 missing_blocks = 3;
required uint64 missing_repl_one_blocks = 4;
required uint64 blocks_in_future = 5;
required uint64 pending_deletion_blocks = 6;
}
message GetFsECBlockGroupsStatsRequestProto { // no input paramters
}
message GetFsECBlockGroupsStatsResponseProto {
required uint64 low_redundancy = 1;
required uint64 corrupt_blocks = 2;
required uint64 missing_blocks = 3;
required uint64 blocks_in_future = 4;
required uint64 pending_deletion_blocks = 5;
}
enum DatanodeReportTypeProto { // type of the datanode report
ALL = 1;
LIVE = 2;
@ -792,6 +815,10 @@ service ClientNamenodeProtocol {
rpc recoverLease(RecoverLeaseRequestProto)
returns(RecoverLeaseResponseProto);
rpc getFsStats(GetFsStatusRequestProto) returns(GetFsStatsResponseProto);
rpc getFsBlocksStats(GetFsBlocksStatsRequestProto)
returns (GetFsBlocksStatsResponseProto);
rpc getFsECBlockGroupsStats(GetFsECBlockGroupsStatsRequestProto)
returns (GetFsECBlockGroupsStatsResponseProto);
rpc getDatanodeReport(GetDatanodeReportRequestProto)
returns(GetDatanodeReportResponseProto);
rpc getDatanodeStorageReport(GetDatanodeStorageReportRequestProto)

View File

@ -124,7 +124,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFil
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsBlocksStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsBlocksStatsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupsStatsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto;
@ -745,6 +749,28 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
}
@Override
public GetFsBlocksStatsResponseProto getFsBlocksStats(
RpcController controller, GetFsBlocksStatsRequestProto request)
throws ServiceException {
try {
return PBHelperClient.convert(server.getBlocksStats());
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetFsECBlockGroupsStatsResponseProto getFsECBlockGroupsStats(
RpcController controller, GetFsECBlockGroupsStatsRequestProto request)
throws ServiceException {
try {
return PBHelperClient.convert(server.getECBlockGroupsStats());
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetDatanodeReportResponseProto getDatanodeReport(
RpcController controller, GetDatanodeReportRequestProto req)

View File

@ -195,7 +195,7 @@ public class BlockManager implements BlockStatsMXBean {
return pendingReconstructionBlocksCount;
}
/** Used by metrics */
public long getUnderReplicatedBlocksCount() {
public long getLowRedundancyBlocksCount() {
return lowRedundancyBlocksCount;
}
/** Used by metrics */
@ -231,6 +231,51 @@ public class BlockManager implements BlockStatsMXBean {
return pendingReconstruction.getNumTimedOuts();
}
/** Used by metrics. */
public long getLowRedundancyBlocksStat() {
return neededReconstruction.getLowRedundancyBlocksStat();
}
/** Used by metrics. */
public long getCorruptBlocksStat() {
return corruptReplicas.getCorruptBlocksStat();
}
/** Used by metrics. */
public long getMissingBlocksStat() {
return neededReconstruction.getCorruptBlocksStat();
}
/** Used by metrics. */
public long getMissingReplicationOneBlocksStat() {
return neededReconstruction.getCorruptReplicationOneBlocksStat();
}
/** Used by metrics. */
public long getPendingDeletionBlocksStat() {
return invalidateBlocks.getBlocksStat();
}
/** Used by metrics. */
public long getLowRedundancyECBlockGroupsStat() {
return neededReconstruction.getLowRedundancyECBlockGroupsStat();
}
/** Used by metrics. */
public long getCorruptECBlockGroupsStat() {
return corruptReplicas.getCorruptECBlockGroupsStat();
}
/** Used by metrics. */
public long getMissingECBlockGroupsStat() {
return neededReconstruction.getCorruptECBlockGroupsStat();
}
/** Used by metrics. */
public long getPendingDeletionECBlockGroupsStat() {
return invalidateBlocks.getECBlockGroupsStat();
}
/**
* redundancyRecheckInterval is how often namenode checks for new
* reconstruction work.
@ -2244,6 +2289,14 @@ public class BlockManager implements BlockStatsMXBean {
return bmSafeMode.getBytesInFuture();
}
public long getBytesInFutureReplicatedBlocksStat() {
return bmSafeMode.getBytesInFutureBlocks();
}
public long getBytesInFutureStripedBlocksStat() {
return bmSafeMode.getBytesInFutureECBlockGroups();
}
/**
* Removes the blocks from blocksmap and updates the safemode blocks total.
* @param blocks An instance of {@link BlocksMapUpdateInfo} which contains a
@ -4245,7 +4298,7 @@ public class BlockManager implements BlockStatsMXBean {
public long getMissingReplOneBlocksCount() {
// not locking
return this.neededReconstruction.getCorruptReplOneBlockSize();
return this.neededReconstruction.getCorruptReplicationOneBlockSize();
}
public BlockInfo addBlockCollection(BlockInfo block,

View File

@ -41,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@ -116,7 +117,9 @@ class BlockManagerSafeMode {
private Counter awaitingReportedBlocksCounter;
/** Keeps track of how many bytes are in Future Generation blocks. */
private final AtomicLong numberOfBytesInFutureBlocks = new AtomicLong();
private final LongAdder bytesInFutureBlocks = new LongAdder();
private final LongAdder bytesInFutureECBlockGroups = new LongAdder();
/** Reports if Name node was started with Rollback option. */
private final boolean inRollBack;
@ -358,12 +361,13 @@ class BlockManagerSafeMode {
boolean leaveSafeMode(boolean force) {
assert namesystem.hasWriteLock() : "Leaving safe mode needs write lock!";
final long bytesInFuture = numberOfBytesInFutureBlocks.get();
final long bytesInFuture = getBytesInFuture();
if (bytesInFuture > 0) {
if (force) {
LOG.warn("Leaving safe mode due to forceExit. This will cause a data "
+ "loss of {} byte(s).", bytesInFuture);
numberOfBytesInFutureBlocks.set(0);
bytesInFutureBlocks.reset();
bytesInFutureECBlockGroups.reset();
} else {
LOG.error("Refusing to leave safe mode without a force flag. " +
"Exiting safe mode will cause a deletion of {} byte(s). Please " +
@ -481,9 +485,12 @@ class BlockManagerSafeMode {
}
if (!blockManager.getShouldPostponeBlocksFromFuture() &&
!inRollBack &&
blockManager.isGenStampInFuture(brr)) {
numberOfBytesInFutureBlocks.addAndGet(brr.getBytesOnDisk());
!inRollBack && blockManager.isGenStampInFuture(brr)) {
if (BlockIdManager.isStripedBlockID(brr.getBlockId())) {
bytesInFutureECBlockGroups.add(brr.getBytesOnDisk());
} else {
bytesInFutureBlocks.add(brr.getBytesOnDisk());
}
}
}
@ -494,7 +501,15 @@ class BlockManagerSafeMode {
* @return Bytes in future
*/
long getBytesInFuture() {
return numberOfBytesInFutureBlocks.get();
return getBytesInFutureBlocks() + getBytesInFutureECBlockGroups();
}
long getBytesInFutureBlocks() {
return bytesInFutureBlocks.longValue();
}
long getBytesInFutureECBlockGroups() {
return bytesInFutureECBlockGroups.longValue();
}
void close() {

View File

@ -17,17 +17,16 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.Server;
@ -58,6 +57,9 @@ public class CorruptReplicasMap{
private final Map<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
new HashMap<Block, Map<DatanodeDescriptor, Reason>>();
private final LongAdder totalCorruptBlocks = new LongAdder();
private final LongAdder totalCorruptECBlockGroups = new LongAdder();
/**
* Mark the block belonging to datanode as corrupt.
*
@ -72,6 +74,7 @@ public class CorruptReplicasMap{
if (nodes == null) {
nodes = new HashMap<DatanodeDescriptor, Reason>();
corruptReplicasMap.put(blk, nodes);
incrementBlockStat(blk);
}
String reasonText;
@ -97,13 +100,15 @@ public class CorruptReplicasMap{
}
/**
* Remove Block from CorruptBlocksMap
*
* Remove Block from CorruptBlocksMap.
* @param blk Block to be removed
*/
void removeFromCorruptReplicasMap(Block blk) {
if (corruptReplicasMap != null) {
corruptReplicasMap.remove(blk);
Map<DatanodeDescriptor, Reason> value = corruptReplicasMap.remove(blk);
if (value != null) {
decrementBlockStat(blk);
}
}
}
@ -121,8 +126,9 @@ public class CorruptReplicasMap{
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
Reason reason) {
Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
if (datanodes==null)
if (datanodes == null) {
return false;
}
// if reasons can be compared but don't match, return false.
Reason storedReason = datanodes.get(datanode);
@ -135,12 +141,28 @@ public class CorruptReplicasMap{
if (datanodes.isEmpty()) {
// remove the block if there is no more corrupted replicas
corruptReplicasMap.remove(blk);
decrementBlockStat(blk);
}
return true;
}
return false;
}
private void incrementBlockStat(Block block) {
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
totalCorruptECBlockGroups.increment();
} else {
totalCorruptBlocks.increment();
}
}
private void decrementBlockStat(Block block) {
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
totalCorruptECBlockGroups.decrement();
} else {
totalCorruptBlocks.decrement();
}
}
/**
* Get Nodes which have corrupt replicas of Block
@ -188,49 +210,30 @@ public class CorruptReplicasMap{
* @param startingBlockId Block id from which to start. If null, start at
* beginning.
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists
*
*/
@VisibleForTesting
long[] getCorruptReplicaBlockIdsForTesting(int numExpectedBlocks,
Long startingBlockId) {
long[] getCorruptBlockIdsForTesting(BlockType blockType,
int numExpectedBlocks, Long startingBlockId) {
if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
return null;
}
Iterator<Block> blockIt =
new TreeMap<>(corruptReplicasMap).keySet().iterator();
// if the starting block id was specified, iterate over keys until
// we find the matching block. If we find a matching block, break
// to leave the iterator on the next block after the specified block.
if (startingBlockId != null) {
boolean isBlockFound = false;
while (blockIt.hasNext()) {
Block b = blockIt.next();
if (b.getBlockId() == startingBlockId) {
isBlockFound = true;
break;
}
}
if (!isBlockFound) {
return null;
}
}
ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();
// append up to numExpectedBlocks blockIds to our list
for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
corruptReplicaBlockIds.add(blockIt.next().getBlockId());
}
long[] ret = new long[corruptReplicaBlockIds.size()];
for(int i=0; i<ret.length; i++) {
ret[i] = corruptReplicaBlockIds.get(i);
}
return ret;
long cursorBlockId =
startingBlockId != null ? startingBlockId : Long.MIN_VALUE;
return corruptReplicasMap.keySet()
.stream()
.filter(r -> {
if (blockType == BlockType.STRIPED) {
return BlockIdManager.isStripedBlockID(r.getBlockId()) &&
r.getBlockId() >= cursorBlockId;
} else {
return !BlockIdManager.isStripedBlockID(r.getBlockId()) &&
r.getBlockId() >= cursorBlockId;
}
})
.sorted()
.limit(numExpectedBlocks)
.mapToLong(Block::getBlockId)
.toArray();
}
/**
@ -263,4 +266,12 @@ public class CorruptReplicasMap{
return null;
}
}
long getCorruptBlocksStat() {
return totalCorruptBlocks.longValue();
}
long getCorruptECBlockGroupsStat() {
return totalCorruptECBlockGroups.longValue();
}
}

View File

@ -23,8 +23,11 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -47,12 +50,12 @@ import org.slf4j.Logger;
*/
@InterfaceAudience.Private
class InvalidateBlocks {
/** Mapping: DatanodeInfo -> Collection of Blocks */
private final Map<DatanodeInfo, LightWeightHashSet<Block>> node2blocks =
new HashMap<DatanodeInfo, LightWeightHashSet<Block>>();
/** The total number of blocks in the map. */
private long numBlocks = 0L;
private final Map<DatanodeInfo, LightWeightHashSet<Block>>
nodeToBlocks = new HashMap<>();
private final Map<DatanodeInfo, LightWeightHashSet<Block>>
nodeToECBlockGroups = new HashMap<>();
private final LongAdder numBlocks = new LongAdder();
private final LongAdder numECBlockGroups = new LongAdder();
private final int blockInvalidateLimit;
/**
@ -80,11 +83,73 @@ class InvalidateBlocks {
sdf.format(calendar.getTime()));
}
/** @return the number of blocks to be invalidated . */
synchronized long numBlocks() {
return numBlocks;
/**
* @return The total number of blocks to be invalidated.
*/
long numBlocks() {
return getECBlockGroupsStat() + getBlocksStat();
}
/**
* @return The total number of blocks of type
* {@link org.apache.hadoop.hdfs.protocol.BlockType#CONTIGUOUS}
* to be invalidated.
*/
long getBlocksStat() {
return numBlocks.longValue();
}
/**
* @return The total number of blocks of type
* {@link org.apache.hadoop.hdfs.protocol.BlockType#STRIPED}
* to be invalidated.
*/
long getECBlockGroupsStat() {
return numECBlockGroups.longValue();
}
private LightWeightHashSet<Block> getBlocksSet(final DatanodeInfo dn) {
if (nodeToBlocks.containsKey(dn)) {
return nodeToBlocks.get(dn);
}
return null;
}
private LightWeightHashSet<Block> getECBlockGroupsSet(final DatanodeInfo dn) {
if (nodeToECBlockGroups.containsKey(dn)) {
return nodeToECBlockGroups.get(dn);
}
return null;
}
private LightWeightHashSet<Block> getBlocksSet(final DatanodeInfo dn,
final Block block) {
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
return getECBlockGroupsSet(dn);
} else {
return getBlocksSet(dn);
}
}
private void putBlocksSet(final DatanodeInfo dn, final Block block,
final LightWeightHashSet set) {
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
assert getECBlockGroupsSet(dn) == null;
nodeToECBlockGroups.put(dn, set);
} else {
assert getBlocksSet(dn) == null;
nodeToBlocks.put(dn, set);
}
}
private long getBlockSetsSize(final DatanodeInfo dn) {
LightWeightHashSet<Block> replicaBlocks = getBlocksSet(dn);
LightWeightHashSet<Block> stripedBlocks = getECBlockGroupsSet(dn);
return ((replicaBlocks == null ? 0 : replicaBlocks.size()) +
(stripedBlocks == null ? 0 : stripedBlocks.size()));
}
/**
* @return true if the given storage has the given block listed for
* invalidation. Blocks are compared including their generation stamps:
@ -92,7 +157,7 @@ class InvalidateBlocks {
* returns false.
*/
synchronized boolean contains(final DatanodeInfo dn, final Block block) {
final LightWeightHashSet<Block> s = node2blocks.get(dn);
final LightWeightHashSet<Block> s = getBlocksSet(dn, block);
if (s == null) {
return false; // no invalidate blocks for this storage ID
}
@ -102,18 +167,22 @@ class InvalidateBlocks {
}
/**
* Add a block to the block collection
* which will be invalidated on the specified datanode.
* Add a block to the block collection which will be
* invalidated on the specified datanode.
*/
synchronized void add(final Block block, final DatanodeInfo datanode,
final boolean log) {
LightWeightHashSet<Block> set = node2blocks.get(datanode);
LightWeightHashSet<Block> set = getBlocksSet(datanode, block);
if (set == null) {
set = new LightWeightHashSet<Block>();
node2blocks.put(datanode, set);
set = new LightWeightHashSet<>();
putBlocksSet(datanode, block, set);
}
if (set.add(block)) {
numBlocks++;
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
numECBlockGroups.increment();
} else {
numBlocks.increment();
}
if (log) {
NameNode.blockStateChangeLog.debug("BLOCK* {}: add {} to {}",
getClass().getSimpleName(), block, datanode);
@ -123,44 +192,61 @@ class InvalidateBlocks {
/** Remove a storage from the invalidatesSet */
synchronized void remove(final DatanodeInfo dn) {
final LightWeightHashSet<Block> blocks = node2blocks.remove(dn);
if (blocks != null) {
numBlocks -= blocks.size();
LightWeightHashSet<Block> replicaBlockSets = nodeToBlocks.remove(dn);
if (replicaBlockSets != null) {
numBlocks.add(replicaBlockSets.size() * -1);
}
LightWeightHashSet<Block> blockGroupSets = nodeToECBlockGroups.remove(dn);
if (blockGroupSets != null) {
numECBlockGroups.add(blockGroupSets.size() * -1);
}
}
/** Remove the block from the specified storage. */
synchronized void remove(final DatanodeInfo dn, final Block block) {
final LightWeightHashSet<Block> v = node2blocks.get(dn);
final LightWeightHashSet<Block> v = getBlocksSet(dn, block);
if (v != null && v.remove(block)) {
numBlocks--;
if (v.isEmpty()) {
node2blocks.remove(dn);
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
numECBlockGroups.decrement();
} else {
numBlocks.decrement();
}
if (v.isEmpty() && getBlockSetsSize(dn) == 0) {
remove(dn);
}
}
}
/** Print the contents to out. */
synchronized void dump(final PrintWriter out) {
final int size = node2blocks.values().size();
out.println("Metasave: Blocks " + numBlocks
+ " waiting deletion from " + size + " datanodes.");
if (size == 0) {
return;
}
for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
private void dumpBlockSet(final Map<DatanodeInfo,
LightWeightHashSet<Block>> nodeToBlocksMap, final PrintWriter out) {
for(Entry<DatanodeInfo, LightWeightHashSet<Block>> entry :
nodeToBlocksMap.entrySet()) {
final LightWeightHashSet<Block> blocks = entry.getValue();
if (blocks.size() > 0) {
if (blocks != null && blocks.size() > 0) {
out.println(entry.getKey());
out.println(StringUtils.join(',', blocks));
}
}
}
/** Print the contents to out. */
synchronized void dump(final PrintWriter out) {
final int size = nodeToBlocks.values().size() +
nodeToECBlockGroups.values().size();
out.println("Metasave: Blocks " + numBlocks()
+ " waiting deletion from " + size + " datanodes.");
if (size == 0) {
return;
}
dumpBlockSet(nodeToBlocks, out);
dumpBlockSet(nodeToECBlockGroups, out);
}
/** @return a list of the storage IDs. */
synchronized List<DatanodeInfo> getDatanodes() {
return new ArrayList<DatanodeInfo>(node2blocks.keySet());
HashSet<DatanodeInfo> set = new HashSet<>();
set.addAll(nodeToBlocks.keySet());
set.addAll(nodeToECBlockGroups.keySet());
return new ArrayList<>(set);
}
/**
@ -171,6 +257,22 @@ class InvalidateBlocks {
return pendingPeriodInMs - (Time.monotonicNow() - startupTime);
}
/**
* Get blocks to invalidate by limit as blocks that can be sent in one
* message is limited.
* @return the remaining limit
*/
private int getBlocksToInvalidateByLimit(LightWeightHashSet<Block> blockSet,
List<Block> toInvalidate, LongAdder statsAdder, int limit) {
assert blockSet != null;
int remainingLimit = limit;
List<Block> polledBlocks = blockSet.pollN(limit);
remainingLimit -= polledBlocks.size();
toInvalidate.addAll(polledBlocks);
statsAdder.add(polledBlocks.size() * -1);
return remainingLimit;
}
synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) {
final long delay = getInvalidationDelay();
if (delay > 0) {
@ -179,27 +281,29 @@ class InvalidateBlocks {
+ "The deletion will start after {} ms.", delay);
return null;
}
final LightWeightHashSet<Block> set = node2blocks.get(dn);
if (set == null) {
return null;
int remainingLimit = blockInvalidateLimit;
final List<Block> toInvalidate = new ArrayList<>();
if (nodeToBlocks.get(dn) != null) {
remainingLimit = getBlocksToInvalidateByLimit(nodeToBlocks.get(dn),
toInvalidate, numBlocks, remainingLimit);
}
// # blocks that can be sent in one message is limited
final int limit = blockInvalidateLimit;
final List<Block> toInvalidate = set.pollN(limit);
// If we send everything in this message, remove this node entry
if (set.isEmpty()) {
if ((remainingLimit > 0) && (nodeToECBlockGroups.get(dn) != null)) {
getBlocksToInvalidateByLimit(nodeToECBlockGroups.get(dn),
toInvalidate, numECBlockGroups, remainingLimit);
}
if (toInvalidate.size() > 0 && getBlockSetsSize(dn) == 0) {
remove(dn);
}
dn.addBlocksToBeInvalidated(toInvalidate);
numBlocks -= toInvalidate.size();
return toInvalidate;
}
synchronized void clear() {
node2blocks.clear();
numBlocks = 0;
nodeToBlocks.clear();
nodeToECBlockGroups.clear();
numBlocks.reset();
numECBlockGroups.reset();
}
}

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
@ -85,7 +86,12 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
= new ArrayList<>(LEVEL);
/** The number of corrupt blocks with replication factor 1 */
private int corruptReplOneBlocks = 0;
private final LongAdder lowRedundancyBlocks = new LongAdder();
private final LongAdder corruptBlocks = new LongAdder();
private final LongAdder corruptReplicationOneBlocks = new LongAdder();
private final LongAdder lowRedundancyECBlockGroups = new LongAdder();
private final LongAdder corruptECBlockGroups = new LongAdder();
/** Create an object. */
LowRedundancyBlocks() {
@ -101,7 +107,11 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.get(i).clear();
}
corruptReplOneBlocks = 0;
lowRedundancyBlocks.reset();
corruptBlocks.reset();
corruptReplicationOneBlocks.reset();
lowRedundancyECBlockGroups.reset();
corruptECBlockGroups.reset();
}
/** Return the total number of insufficient redundancy blocks. */
@ -133,8 +143,35 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
}
/** Return the number of corrupt blocks with replication factor 1 */
synchronized int getCorruptReplOneBlockSize() {
return corruptReplOneBlocks;
long getCorruptReplicationOneBlockSize() {
return getCorruptReplicationOneBlocksStat();
}
/**
* Return under replicated block count excluding corrupt replicas.
*/
long getLowRedundancyBlocksStat() {
return lowRedundancyBlocks.longValue() - getCorruptBlocksStat();
}
long getCorruptBlocksStat() {
return corruptBlocks.longValue();
}
long getCorruptReplicationOneBlocksStat() {
return corruptReplicationOneBlocks.longValue();
}
/**
* Return low redundancy striped blocks excluding corrupt blocks.
*/
long getLowRedundancyECBlockGroupsStat() {
return lowRedundancyECBlockGroups.longValue() -
getCorruptECBlockGroupsStat();
}
long getCorruptECBlockGroupsStat() {
return corruptECBlockGroups.longValue();
}
/** Check if a block is in the neededReconstruction queue. */
@ -236,11 +273,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
int outOfServiceReplicas, int expectedReplicas) {
final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
outOfServiceReplicas, expectedReplicas);
if(priorityQueues.get(priLevel).add(block)) {
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
expectedReplicas == 1) {
corruptReplOneBlocks++;
}
if(add(block, priLevel, expectedReplicas)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.LowRedundancyBlock.add: {}"
+ " has only {} replicas and need {} replicas so is added to"
@ -252,18 +285,43 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
return false;
}
private boolean add(BlockInfo blockInfo, int priLevel, int expectedReplicas) {
if (priorityQueues.get(priLevel).add(blockInfo)) {
incrementBlockStat(blockInfo, priLevel, expectedReplicas);
return true;
}
return false;
}
private void incrementBlockStat(BlockInfo blockInfo, int priLevel,
int expectedReplicas) {
if (blockInfo.isStriped()) {
lowRedundancyECBlockGroups.increment();
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS) {
corruptECBlockGroups.increment();
}
} else {
lowRedundancyBlocks.increment();
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS) {
corruptBlocks.increment();
if (expectedReplicas == 1) {
corruptReplicationOneBlocks.increment();
}
}
}
}
/** Remove a block from a low redundancy queue. */
synchronized boolean remove(BlockInfo block,
int oldReplicas, int oldReadOnlyReplicas,
int outOfServiceReplicas, int oldExpectedReplicas) {
final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
outOfServiceReplicas, oldExpectedReplicas);
boolean removedBlock = remove(block, priLevel);
boolean removedBlock = remove(block, priLevel, oldExpectedReplicas);
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
oldExpectedReplicas == 1 &&
removedBlock) {
corruptReplOneBlocks--;
assert corruptReplOneBlocks >= 0 :
assert corruptReplicationOneBlocks.longValue() >= 0 :
"Number of corrupt blocks with replication factor 1 " +
"should be non-negative";
}
@ -287,12 +345,17 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
* queues
*/
boolean remove(BlockInfo block, int priLevel) {
return remove(block, priLevel, block.getReplication());
}
boolean remove(BlockInfo block, int priLevel, int oldExpectedReplicas) {
if(priLevel >= 0 && priLevel < LEVEL
&& priorityQueues.get(priLevel).remove(block)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block {}"
+ " from priority queue {}",
block, priLevel);
decrementBlockStat(block, priLevel, oldExpectedReplicas);
return true;
} else {
// Try to remove the block from all queues if the block was
@ -302,6 +365,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block" +
" {} from priority queue {}", block, i);
decrementBlockStat(block, priLevel, oldExpectedReplicas);
return true;
}
}
@ -309,6 +373,27 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
return false;
}
private void decrementBlockStat(BlockInfo blockInfo, int priLevel,
int oldExpectedReplicas) {
if (blockInfo.isStriped()) {
lowRedundancyECBlockGroups.decrement();
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS) {
corruptECBlockGroups.decrement();
}
} else {
lowRedundancyBlocks.decrement();
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS) {
corruptBlocks.decrement();
if (oldExpectedReplicas == 1) {
corruptReplicationOneBlocks.decrement();
assert corruptReplicationOneBlocks.longValue() >= 0 :
"Number of corrupt blocks with replication factor 1 " +
"should be non-negative";
}
}
}
}
/**
* Recalculate and potentially update the priority level of a block.
*
@ -348,8 +433,8 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
}
// oldPri is mostly correct, but not always. If not found with oldPri,
// other levels will be searched until the block is found & removed.
remove(block, oldPri);
if(priorityQueues.get(curPri).add(block)) {
remove(block, oldPri, oldExpectedReplicas);
if(add(block, curPri, curExpectedReplicas)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} "
+ "replicas and needs {} replicas so is added to "
@ -357,18 +442,6 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
block, curReplicas, curExpectedReplicas, curPri);
}
if (oldPri != curPri || expectedReplicasDelta != 0) {
// corruptReplOneBlocks could possibly change
if (curPri == QUEUE_WITH_CORRUPT_BLOCKS &&
curExpectedReplicas == 1) {
// add a new corrupt block with replication factor 1
corruptReplOneBlocks++;
} else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS &&
curExpectedReplicas - expectedReplicasDelta == 1) {
// remove an existing corrupt block with replication factor 1
corruptReplOneBlocks--;
}
}
}
/**

View File

@ -89,6 +89,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
@ -240,8 +242,10 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.server.namenode.metrics.ECBlockGroupsStatsMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksStatsMBean;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
@ -335,7 +339,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
@InterfaceAudience.Private
@Metrics(context="dfs")
public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NameNodeMXBean {
NameNodeMXBean, ReplicatedBlocksStatsMBean, ECBlockGroupsStatsMBean {
public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
private final MetricsRegistry registry = new MetricsRegistry("FSNamesystem");
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
@ -4005,9 +4009,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/** @see ClientProtocol#getStats() */
long[] getStats() {
final long[] stats = datanodeStatistics.getStats();
stats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = getUnderReplicatedBlocks();
stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = getCorruptReplicaBlocks();
stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = getMissingBlocksCount();
stats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX] =
getLowRedundancyBlocks();
stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] =
getCorruptReplicaBlocks();
stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] =
getMissingBlocksCount();
stats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] =
getMissingReplOneBlocksCount();
stats[ClientProtocol.GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX] =
@ -4017,6 +4024,31 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return stats;
}
/**
* Get statistics pertaining to blocks of type {@link BlockType#CONTIGUOUS}
* in the filesystem.
* <p>
* @see ClientProtocol#getBlocksStats()
*/
BlocksStats getBlocksStats() {
return new BlocksStats(getLowRedundancyBlocksStat(),
getCorruptBlocksStat(), getMissingBlocksStat(),
getMissingReplicationOneBlocksStat(), getBlocksBytesInFutureStat(),
getPendingDeletionBlocksStat());
}
/**
* Get statistics pertaining to blocks of type {@link BlockType#STRIPED}
* in the filesystem.
* <p>
* @see ClientProtocol#getECBlockGroupsStats()
*/
ECBlockGroupsStats getECBlockGroupsStats() {
return new ECBlockGroupsStats(getLowRedundancyECBlockGroupsStat(),
getCorruptECBlockGroupsStat(), getMissingECBlockGroupsStat(),
getECBlocksBytesInFutureStat(), getPendingDeletionECBlockGroupsStat());
}
@Override // FSNamesystemMBean
@Metric({"CapacityTotal",
"Total raw capacity of data nodes in bytes"})
@ -4501,16 +4533,43 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return this.dir.totalInodes();
}
/**
* Get aggregated count of all blocks pending to be reconstructed.
*/
@Override // FSNamesystemMBean
@Metric
@Deprecated
public long getPendingReplicationBlocks() {
return blockManager.getPendingReconstructionBlocksCount();
}
/**
* Get aggregated count of all blocks pending to be reconstructed.
*/
@Override // FSNamesystemMBean
@Metric
public long getPendingReconstructionBlocks() {
return blockManager.getPendingReconstructionBlocksCount();
}
/**
* Get aggregated count of all blocks with low redundancy.
* @deprecated - Use {@link #getLowRedundancyBlocks()} instead.
*/
@Override // FSNamesystemMBean
@Metric
@Deprecated
public long getUnderReplicatedBlocks() {
return blockManager.getUnderReplicatedBlocksCount();
return blockManager.getLowRedundancyBlocksCount();
}
/**
* Get aggregated count of all blocks with low redundancy.
*/
@Override // FSNamesystemMBean
@Metric
public long getLowRedundancyBlocks() {
return blockManager.getLowRedundancyBlocksCount();
}
/** Returns number of blocks with corrupt replicas */
@ -4531,6 +4590,81 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return blockManager.getPendingDeletionBlocksCount();
}
@Override // ReplicatedBlocksMBean
@Metric({"LowRedundancyReplicatedBlocks",
"Number of low redundancy replicated blocks"})
public long getLowRedundancyBlocksStat() {
return blockManager.getLowRedundancyBlocksStat();
}
@Override // ReplicatedBlocksMBean
@Metric({"CorruptReplicatedBlocks", "Number of corrupted replicated blocks"})
public long getCorruptBlocksStat() {
return blockManager.getCorruptBlocksStat();
}
@Override // ReplicatedBlocksMBean
@Metric({"MissingReplicatedBlocks", "Number of missing replicated blocks"})
public long getMissingBlocksStat() {
return blockManager.getMissingBlocksStat();
}
@Override // ReplicatedBlocksMBean
@Metric({"MissingReplicatedOneBlocks", "Number of missing replicated blocks" +
" with replication factor 1"})
public long getMissingReplicationOneBlocksStat() {
return blockManager.getMissingReplicationOneBlocksStat();
}
@Override // ReplicatedBlocksMBean
@Metric({"BytesReplicatedFutureBlocks", "Total bytes in replicated blocks " +
"with future generation stamp"})
public long getBlocksBytesInFutureStat() {
return blockManager.getBytesInFutureReplicatedBlocksStat();
}
@Override // ReplicatedBlocksMBean
@Metric({"PendingDeletionReplicatedBlocks", "Number of replicated blocks " +
"that are pending deletion"})
public long getPendingDeletionBlocksStat() {
return blockManager.getPendingDeletionBlocksStat();
}
@Override // ECBlockGroupsStatsMBean
@Metric({"LowRedundancyECBlockGroups", "Number of erasure coded block " +
"groups with low redundancy"})
public long getLowRedundancyECBlockGroupsStat() {
return blockManager.getLowRedundancyECBlockGroupsStat();
}
@Override // ECBlockGroupsStatsMBean
@Metric({"CorruptECBlockGroups", "Number of erasure coded block groups that" +
" are corrupt"})
public long getCorruptECBlockGroupsStat() {
return blockManager.getCorruptECBlockGroupsStat();
}
@Override // ECBlockGroupsStatsMBean
@Metric({"MissingECBlockGroups", "Number of erasure coded block groups that" +
" are missing"})
public long getMissingECBlockGroupsStat() {
return blockManager.getMissingECBlockGroupsStat();
}
@Override // ECBlockGroupsStatsMBean
@Metric({"BytesFutureECBlockGroups", "Total bytes in erasure coded block " +
"groups with future generation stamp"})
public long getECBlocksBytesInFutureStat() {
return blockManager.getBytesInFutureStripedBlocksStat();
}
@Override // ECBlockGroupsStatsMBean
@Metric({"PendingDeletionECBlockGroups", "Number of erasure coded block " +
"groups that are pending deletion"})
public long getPendingDeletionECBlockGroupsStat() {
return blockManager.getPendingDeletionECBlockGroupsStat();
}
@Override
public long getBlockDeletionStartTime() {
return startTime + blockManager.getStartupDelayBlockDeletionInMs();
@ -4588,39 +4722,62 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return isInSafeMode() ? "safeMode" : "Operational";
}
private ObjectName mbeanName;
private ObjectName mxbeanName;
private ObjectName namesystemMBeanName, replicatedBlocksMBeanName,
ecBlockGroupsMBeanName, namenodeMXBeanName;
/**
* Register the FSNamesystem MBean using the name
* Register following MBeans with their respective names.
* FSNamesystemMBean:
* "hadoop:service=NameNode,name=FSNamesystemState"
* ReplicatedBlocksStatsMBean:
* "hadoop:service=NameNode,name=ReplicatedBlocksState"
* ECBlockGroupsStatsMBean:
* "hadoop:service=NameNode,name=ECBlockGroupsState"
*/
private void registerMBean() {
// We can only implement one MXBean interface, so we keep the old one.
try {
StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class);
mbeanName = MBeans.register("NameNode", "FSNamesystemState", bean);
StandardMBean namesystemBean = new StandardMBean(
this, FSNamesystemMBean.class);
StandardMBean replicaBean = new StandardMBean(
this, ReplicatedBlocksStatsMBean.class);
StandardMBean ecBean = new StandardMBean(
this, ECBlockGroupsStatsMBean.class);
namesystemMBeanName = MBeans.register(
"NameNode", "FSNamesystemState", namesystemBean);
replicatedBlocksMBeanName = MBeans.register(
"NameNode", "ReplicatedBlocksState", replicaBean);
ecBlockGroupsMBeanName = MBeans.register(
"NameNode", "ECBlockGroupsState", ecBean);
} catch (NotCompliantMBeanException e) {
throw new RuntimeException("Bad MBean setup", e);
}
LOG.info("Registered FSNamesystemState MBean");
LOG.info("Registered FSNamesystemState, ReplicatedBlocksState and " +
"ECBlockGroupsState MBeans.");
}
/**
* shutdown FSNamesystem
* Shutdown FSNamesystem.
*/
void shutdown() {
if (snapshotManager != null) {
snapshotManager.shutdown();
}
if (mbeanName != null) {
MBeans.unregister(mbeanName);
mbeanName = null;
if (namesystemMBeanName != null) {
MBeans.unregister(namesystemMBeanName);
namesystemMBeanName = null;
}
if (mxbeanName != null) {
MBeans.unregister(mxbeanName);
mxbeanName = null;
if (replicatedBlocksMBeanName != null) {
MBeans.unregister(replicatedBlocksMBeanName);
replicatedBlocksMBeanName = null;
}
if (ecBlockGroupsMBeanName != null) {
MBeans.unregister(ecBlockGroupsMBeanName);
ecBlockGroupsMBeanName = null;
}
if (namenodeMXBeanName != null) {
MBeans.unregister(namenodeMXBeanName);
namenodeMXBeanName = null;
}
if (dir != null) {
dir.shutdown();
@ -5382,11 +5539,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
"fsck", src, null, null);
}
}
/**
* Register NameNodeMXBean
* Register NameNodeMXBean.
*/
private void registerMXBean() {
mxbeanName = MBeans.register("NameNode", "NameNodeInfo", this);
namenodeMXBeanName = MBeans.register("NameNode", "NameNodeInfo", this);
}
/**

View File

@ -97,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -112,6 +113,7 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@ -1150,6 +1152,20 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return namesystem.getStats();
}
@Override // ClientProtocol
public BlocksStats getBlocksStats() throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ);
return namesystem.getBlocksStats();
}
@Override // ClientProtocol
public ECBlockGroupsStats getECBlockGroupsStats() throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ);
return namesystem.getECBlockGroupsStats();
}
@Override // ClientProtocol
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.metrics;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* This interface defines the methods to get status pertaining to blocks of type
* {@link org.apache.hadoop.hdfs.protocol.BlockType#STRIPED} in FSNamesystem
* of a NameNode. It is also used for publishing via JMX.
* <p>
* Aggregated status of all blocks is reported in
* @see FSNamesystemMBean
* Name Node runtime activity statistic info is reported in
* @see org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics
*
*/
@InterfaceAudience.Private
public interface ECBlockGroupsStatsMBean {
/**
* Return count of erasure coded block groups with low redundancy.
*/
long getLowRedundancyECBlockGroupsStat();
/**
* Return count of erasure coded block groups that are corrupt.
*/
long getCorruptECBlockGroupsStat();
/**
* Return count of erasure coded block groups that are missing.
*/
long getMissingECBlockGroupsStat();
/**
* Return total bytes of erasure coded future block groups.
*/
long getECBlocksBytesInFutureStat();
/**
* Return count of erasure coded block groups that are pending deletion.
*/
long getPendingDeletionECBlockGroupsStat();
}

View File

@ -78,17 +78,31 @@ public interface FSNamesystemMBean {
public long getFilesTotal();
/**
* Blocks pending to be replicated
* @return - num of blocks to be replicated
* Get aggregated count of all blocks pending to be reconstructed.
* @deprecated Use {@link #getPendingReconstructionBlocks()} instead.
*/
@Deprecated
public long getPendingReplicationBlocks();
/**
* Blocks under replicated
* @return - num of blocks under replicated
* Get aggregated count of all blocks pending to be reconstructed.
* @return Number of blocks to be replicated.
*/
public long getPendingReconstructionBlocks();
/**
* Get aggregated count of all blocks with low redundancy.
* @deprecated Use {@link #getLowRedundancyBlocks()} instead.
*/
@Deprecated
public long getUnderReplicatedBlocks();
/**
* Get aggregated count of all blocks with low redundancy.
* @return Number of blocks with low redundancy.
*/
public long getLowRedundancyBlocks();
/**
* Blocks scheduled for replication
* @return - num of blocks scheduled for replication

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.metrics;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* This interface defines the methods to get status pertaining to blocks of type
* {@link org.apache.hadoop.hdfs.protocol.BlockType#CONTIGUOUS} in FSNamesystem
* of a NameNode. It is also used for publishing via JMX.
* <p>
* Aggregated status of all blocks is reported in
* @see FSNamesystemMBean
* Name Node runtime activity statistic info is reported in
* @see org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics
*/
@InterfaceAudience.Private
public interface ReplicatedBlocksStatsMBean {
/**
* Return low redundancy blocks count.
*/
long getLowRedundancyBlocksStat();
/**
* Return corrupt blocks count.
*/
long getCorruptBlocksStat();
/**
* Return missing blocks count.
*/
long getMissingBlocksStat();
/**
* Return count of missing blocks with replication factor one.
*/
long getMissingReplicationOneBlocksStat();
/**
* Return total bytes of future blocks.
*/
long getBlocksBytesInFutureStat();
/**
* Return count of blocks that are pending deletion.
*/
long getPendingDeletionBlocksStat();
}

View File

@ -523,7 +523,7 @@ public class DFSAdmin extends FsShell {
* counts.
*/
System.out.println("Under replicated blocks: " +
dfs.getUnderReplicatedBlocksCount());
dfs.getLowRedundancyBlocksCount());
System.out.println("Blocks with corrupt replicas: " +
dfs.getCorruptBlocksCount());
System.out.println("Missing blocks: " +

View File

@ -110,10 +110,13 @@ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -1650,6 +1653,50 @@ public class DFSTestUtil {
return true;
}
/**
* Verify the aggregated {@link ClientProtocol#getStats()} block counts equal
* the sum of {@link ClientProtocol#getBlocksStats()} and
* {@link ClientProtocol#getECBlockGroupsStats()}.
* @throws Exception
*/
public static void verifyClientStats(Configuration conf,
MiniDFSCluster cluster) throws Exception {
ClientProtocol client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
long[] aggregatedStats = cluster.getNameNode().getRpcServer().getStats();
BlocksStats blocksStats =
client.getBlocksStats();
ECBlockGroupsStats ecBlockGroupsStats = client.getECBlockGroupsStats();
assertEquals("Under replicated stats not matching!",
aggregatedStats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX],
aggregatedStats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]);
assertEquals("Low redundancy stats not matching!",
aggregatedStats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX],
blocksStats.getLowRedundancyBlocksStat() +
ecBlockGroupsStats.getLowRedundancyBlockGroupsStat());
assertEquals("Corrupt blocks stats not matching!",
aggregatedStats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX],
blocksStats.getCorruptBlocksStat() +
ecBlockGroupsStats.getCorruptBlockGroupsStat());
assertEquals("Missing blocks stats not matching!",
aggregatedStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX],
blocksStats.getMissingReplicaBlocksStat() +
ecBlockGroupsStats.getMissingBlockGroupsStat());
assertEquals("Missing blocks with replication factor one not matching!",
aggregatedStats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX],
blocksStats.getMissingReplicationOneBlocksStat());
assertEquals("Bytes in future blocks stats not matching!",
aggregatedStats[ClientProtocol.GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX],
blocksStats.getBytesInFutureBlocksStat() +
ecBlockGroupsStats.getBytesInFutureBlockGroupsStat());
assertEquals("Pending deletion blocks stats not matching!",
aggregatedStats[ClientProtocol.GET_STATS_PENDING_DELETION_BLOCKS_IDX],
blocksStats.getPendingDeletionBlocksStat() +
ecBlockGroupsStats.getPendingDeletionBlockGroupsStat());
}
/**
* Helper function to create a key in the Key Provider. Defaults
* to the first indexed NameNode's Key Provider.

View File

@ -281,7 +281,7 @@ public class TestFileCorruption {
@Override public Boolean get() {
try {
return cluster.getNamesystem().getBlockManager()
.getUnderReplicatedBlocksCount() == 1;
.getLowRedundancyBlocksCount() == 1;
} catch (Exception e) {
e.printStackTrace();
return false;

View File

@ -549,7 +549,7 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, replicas, 1);
writeFile(fileSys, file, replicas, 25);
DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,

View File

@ -98,7 +98,7 @@ public class TestMissingBlocksAlert {
Thread.sleep(100);
}
assertTrue(dfs.getMissingBlocksCount() == 1);
assertEquals(4, dfs.getUnderReplicatedBlocksCount());
assertEquals(4, dfs.getLowRedundancyBlocksCount());
assertEquals(3, bm.getUnderReplicatedNotMissingBlocks());
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@ -117,7 +117,7 @@ public class TestMissingBlocksAlert {
Thread.sleep(100);
}
assertEquals(2, dfs.getUnderReplicatedBlocksCount());
assertEquals(2, dfs.getLowRedundancyBlocksCount());
assertEquals(2, bm.getUnderReplicatedNotMissingBlocks());
Assert.assertEquals(0, (long)(Long) mbs.getAttribute(mxbeanName,

View File

@ -17,18 +17,24 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Random;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -53,10 +59,19 @@ public class TestComputeInvalidateWork {
private FSNamesystem namesystem;
private BlockManager bm;
private DatanodeDescriptor[] nodes;
private ErasureCodingPolicy ecPolicy;
private DistributedFileSystem fs;
private Path ecFile;
private int totalBlockGroups, blockGroupSize, stripesPerBlock, cellSize;
private LocatedStripedBlock locatedStripedBlock;
@Before
public void setup() throws Exception {
ecPolicy = SystemErasureCodingPolicies.getByID(
SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
ecPolicy.getName());
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES)
.build();
cluster.waitActive();
@ -65,6 +80,25 @@ public class TestComputeInvalidateWork {
nodes = bm.getDatanodeManager().getHeartbeatManager().getDatanodes();
BlockManagerTestUtil.stopRedundancyThread(bm);
assertEquals(nodes.length, NUM_OF_DATANODES);
// Create a striped file
Path ecDir = new Path("/ec");
fs = cluster.getFileSystem();
fs.mkdirs(ecDir);
fs.getClient().setErasureCodingPolicy(ecDir.toString(), ecPolicy.getName());
ecFile = new Path(ecDir, "ec-file");
stripesPerBlock = 2;
cellSize = ecPolicy.getCellSize();
int blockSize = stripesPerBlock * cellSize;
blockGroupSize = ecPolicy.getNumDataUnits() * blockSize;
totalBlockGroups = 4;
DFSTestUtil.createStripedFile(cluster, ecFile, ecDir, totalBlockGroups,
stripesPerBlock, false, ecPolicy);
LocatedBlocks lbs = cluster.getFileSystem().getClient().
getNamenode().getBlockLocations(
ecFile.toString(), 0, blockGroupSize);
assert lbs.get(0) instanceof LocatedStripedBlock;
locatedStripedBlock = (LocatedStripedBlock)(lbs.get(0));
}
@After
@ -75,12 +109,28 @@ public class TestComputeInvalidateWork {
}
}
private void verifyInvalidationWorkCounts(int blockInvalidateLimit) {
assertEquals(blockInvalidateLimit * NUM_OF_DATANODES,
bm.computeInvalidateWork(NUM_OF_DATANODES + 1));
assertEquals(blockInvalidateLimit * NUM_OF_DATANODES,
bm.computeInvalidateWork(NUM_OF_DATANODES));
assertEquals(blockInvalidateLimit * (NUM_OF_DATANODES - 1),
bm.computeInvalidateWork(NUM_OF_DATANODES - 1));
int workCount = bm.computeInvalidateWork(1);
if (workCount == 1) {
assertEquals(blockInvalidateLimit + 1, bm.computeInvalidateWork(2));
} else {
assertEquals(workCount, blockInvalidateLimit);
assertEquals(2, bm.computeInvalidateWork(2));
}
}
/**
* Test if {@link BlockManager#computeInvalidateWork(int)}
* can schedule invalidate work correctly
* can schedule invalidate work correctly for the replicas.
*/
@Test(timeout=120000)
public void testCompInvalidate() throws Exception {
public void testComputeInvalidateReplicas() throws Exception {
final int blockInvalidateLimit = bm.getDatanodeManager()
.getBlockInvalidateLimit();
namesystem.writeLock();
@ -92,20 +142,66 @@ public class TestComputeInvalidateWork {
bm.addToInvalidates(block, nodes[i]);
}
}
assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
bm.computeInvalidateWork(NUM_OF_DATANODES+1));
assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
bm.computeInvalidateWork(NUM_OF_DATANODES));
assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
bm.computeInvalidateWork(NUM_OF_DATANODES-1));
int workCount = bm.computeInvalidateWork(1);
if (workCount == 1) {
assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
} else {
assertEquals(workCount, blockInvalidateLimit);
assertEquals(2, bm.computeInvalidateWork(2));
verifyInvalidationWorkCounts(blockInvalidateLimit);
} finally {
namesystem.writeUnlock();
}
}
/**
* Test if {@link BlockManager#computeInvalidateWork(int)}
* can schedule invalidate work correctly for the striped block groups.
*/
@Test(timeout=120000)
public void testComputeInvalidateStripedBlockGroups() throws Exception {
final int blockInvalidateLimit =
bm.getDatanodeManager().getBlockInvalidateLimit();
namesystem.writeLock();
try {
int nodeCount = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
for (int i = 0; i < nodeCount; i++) {
for(int j = 0; j < 3 * blockInvalidateLimit + 1; j++) {
Block blk = new Block(locatedStripedBlock.getBlock().getBlockId() +
(i * 10 + j), stripesPerBlock * cellSize,
locatedStripedBlock.getBlock().getGenerationStamp());
bm.addToInvalidates(blk, nodes[i]);
}
}
verifyInvalidationWorkCounts(blockInvalidateLimit);
} finally {
namesystem.writeUnlock();
}
}
/**
* Test if {@link BlockManager#computeInvalidateWork(int)}
* can schedule invalidate work correctly for both replicas and striped
* block groups, combined.
*/
@Test(timeout=120000)
public void testComputeInvalidate() throws Exception {
final int blockInvalidateLimit =
bm.getDatanodeManager().getBlockInvalidateLimit();
final Random random = new Random(System.currentTimeMillis());
namesystem.writeLock();
try {
int nodeCount = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
for (int i = 0; i < nodeCount; i++) {
for(int j = 0; j < 3 * blockInvalidateLimit + 1; j++) {
if (random.nextBoolean()) {
Block stripedBlock = new Block(
locatedStripedBlock.getBlock().getBlockId() + (i * 10 + j),
stripesPerBlock * cellSize,
locatedStripedBlock.getBlock().getGenerationStamp());
bm.addToInvalidates(stripedBlock, nodes[i]);
} else {
Block replica = new Block(i * (blockInvalidateLimit + 1) + j, 0,
GenerationStamp.LAST_RESERVED_STAMP);
bm.addToInvalidates(replica, nodes[i]);
}
}
}
verifyInvalidationWorkCounts(blockInvalidateLimit);
} finally {
namesystem.writeUnlock();
}
@ -129,6 +225,11 @@ public class TestComputeInvalidateWork {
Block block = new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP);
bm.addToInvalidates(block, nodes[0]);
Block stripedBlock = new Block(
locatedStripedBlock.getBlock().getBlockId() + 100,
stripesPerBlock * cellSize,
locatedStripedBlock.getBlock().getGenerationStamp());
bm.addToInvalidates(stripedBlock, nodes[0]);
bm.getDatanodeManager().registerDatanode(dnr);
// Since UUID has changed, the invalidation work should be skipped
@ -145,26 +246,37 @@ public class TestComputeInvalidateWork {
final DistributedFileSystem dfs = cluster.getFileSystem();
final Path path = new Path("/testRR");
// Create a file and shutdown the DNs, which populates InvalidateBlocks
short totalReplicas = NUM_OF_DATANODES;
DFSTestUtil.createFile(dfs, path, dfs.getDefaultBlockSize(),
(short) NUM_OF_DATANODES, 0xED0ED0);
totalReplicas, 0xED0ED0);
DFSTestUtil.waitForReplication(dfs, path, (short) NUM_OF_DATANODES, 12000);
for (DataNode dn : cluster.getDataNodes()) {
dn.shutdown();
}
dfs.delete(path, false);
dfs.delete(ecFile, false);
namesystem.writeLock();
InvalidateBlocks invalidateBlocks;
int expected = NUM_OF_DATANODES;
int totalStripedDataBlocks = totalBlockGroups * (ecPolicy.getNumDataUnits()
+ ecPolicy.getNumParityUnits());
int expected = totalReplicas + totalStripedDataBlocks;
try {
invalidateBlocks = (InvalidateBlocks) Whitebox
.getInternalState(cluster.getNamesystem().getBlockManager(),
"invalidateBlocks");
assertEquals("Expected invalidate blocks to be the number of DNs",
assertEquals("Invalidate blocks should include both Replicas and " +
"Striped BlockGroups!",
(long) expected, invalidateBlocks.numBlocks());
assertEquals("Unexpected invalidate count for replicas!",
totalReplicas, invalidateBlocks.getBlocksStat());
assertEquals("Unexpected invalidate count for striped block groups!",
totalStripedDataBlocks, invalidateBlocks.getECBlockGroupsStat());
} finally {
namesystem.writeUnlock();
}
// Re-register each DN and see that it wipes the invalidation work
int totalBlockGroupsPerDataNode = totalBlockGroups;
int totalReplicasPerDataNode = totalReplicas / NUM_OF_DATANODES;
for (DataNode dn : cluster.getDataNodes()) {
DatanodeID did = dn.getDatanodeId();
DatanodeRegistration reg = new DatanodeRegistration(
@ -175,7 +287,7 @@ public class TestComputeInvalidateWork {
namesystem.writeLock();
try {
bm.getDatanodeManager().registerDatanode(reg);
expected--;
expected -= (totalReplicasPerDataNode + totalBlockGroupsPerDataNode);
assertEquals("Expected number of invalidate blocks to decrease",
(long) expected, invalidateBlocks.numBlocks());
} finally {

View File

@ -25,14 +25,13 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.junit.Test;
@ -45,88 +44,130 @@ import org.junit.Test;
*/
public class TestCorruptReplicaInfo {
private static final Log LOG =
LogFactory.getLog(TestCorruptReplicaInfo.class);
private final Map<Long, Block> block_map =
new HashMap<Long, Block>();
// Allow easy block creation by block id
// Return existing block if one with same block id already exists
private Block getBlock(Long block_id) {
if (!block_map.containsKey(block_id)) {
block_map.put(block_id, new Block(block_id,0,0));
private static final Log LOG = LogFactory.getLog(
TestCorruptReplicaInfo.class);
private final Map<Long, Block> replicaMap = new HashMap<>();
private final Map<Long, Block> stripedBlocksMap = new HashMap<>();
// Allow easy block creation by block id. Return existing
// replica block if one with same block id already exists.
private Block getReplica(Long blockId) {
if (!replicaMap.containsKey(blockId)) {
replicaMap.put(blockId, new Block(blockId, 0, 0));
}
return block_map.get(block_id);
return replicaMap.get(blockId);
}
private Block getBlock(int block_id) {
return getBlock((long)block_id);
private Block getReplica(int blkId) {
return getReplica(Long.valueOf(blkId));
}
private Block getStripedBlock(int blkId) {
Long stripedBlockId = (1L << 63) + blkId;
assertTrue(BlockIdManager.isStripedBlockID(stripedBlockId));
if (!stripedBlocksMap.containsKey(stripedBlockId)) {
stripedBlocksMap.put(stripedBlockId, new Block(stripedBlockId, 1024, 0));
}
return stripedBlocksMap.get(stripedBlockId);
}
private void verifyCorruptBlocksCount(CorruptReplicasMap corruptReplicasMap,
long expectedReplicaCount, long expectedStripedBlockCount) {
long totalExpectedCorruptBlocks = expectedReplicaCount +
expectedStripedBlockCount;
assertEquals("Unexpected total corrupt blocks count!",
totalExpectedCorruptBlocks, corruptReplicasMap.size());
assertEquals("Unexpected replica blocks count!",
expectedReplicaCount, corruptReplicasMap.getCorruptBlocksStat());
assertEquals("Unexpected striped blocks count!",
expectedStripedBlockCount,
corruptReplicasMap.getCorruptECBlockGroupsStat());
}
@Test
public void testCorruptReplicaInfo() throws IOException,
InterruptedException {
CorruptReplicasMap crm = new CorruptReplicasMap();
// Make sure initial values are returned correctly
assertEquals("Number of corrupt blocks must initially be 0", 0, crm.size());
assertNull("Param n cannot be less than 0", crm.getCorruptReplicaBlockIdsForTesting(-1, null));
assertNull("Param n cannot be greater than 100", crm.getCorruptReplicaBlockIdsForTesting(101, null));
long[] l = crm.getCorruptReplicaBlockIdsForTesting(0, null);
assertNotNull("n = 0 must return non-null", l);
assertEquals("n = 0 must return an empty list", 0, l.length);
public void testCorruptReplicaInfo()
throws IOException, InterruptedException {
CorruptReplicasMap crm = new CorruptReplicasMap();
// create a list of block_ids. A list is used to allow easy validation of the
// output of getCorruptReplicaBlockIds
int NUM_BLOCK_IDS = 140;
List<Long> block_ids = new LinkedList<Long>();
for (int i=0;i<NUM_BLOCK_IDS;i++) {
block_ids.add((long)i);
}
DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
addToCorruptReplicasMap(crm, getBlock(0), dn1);
assertEquals("Number of corrupt blocks not returning correctly",
1, crm.size());
addToCorruptReplicasMap(crm, getBlock(1), dn1);
assertEquals("Number of corrupt blocks not returning correctly",
2, crm.size());
addToCorruptReplicasMap(crm, getBlock(1), dn2);
assertEquals("Number of corrupt blocks not returning correctly",
2, crm.size());
crm.removeFromCorruptReplicasMap(getBlock(1));
assertEquals("Number of corrupt blocks not returning correctly",
1, crm.size());
crm.removeFromCorruptReplicasMap(getBlock(0));
assertEquals("Number of corrupt blocks not returning correctly",
0, crm.size());
for (Long block_id: block_ids) {
addToCorruptReplicasMap(crm, getBlock(block_id), dn1);
}
assertEquals("Number of corrupt blocks not returning correctly",
NUM_BLOCK_IDS, crm.size());
assertTrue("First five block ids not returned correctly ",
Arrays.equals(new long[]{0,1,2,3,4},
crm.getCorruptReplicaBlockIdsForTesting(5, null)));
LOG.info(crm.getCorruptReplicaBlockIdsForTesting(10, 7L));
LOG.info(block_ids.subList(7, 18));
// Make sure initial values are returned correctly
assertEquals("Total number of corrupt blocks must initially be 0!",
0, crm.size());
assertEquals("Number of corrupt replicas must initially be 0!",
0, crm.getCorruptBlocksStat());
assertEquals("Number of corrupt striped block groups must initially be 0!",
0, crm.getCorruptECBlockGroupsStat());
assertNull("Param n cannot be less than 0",
crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, -1, null));
assertNull("Param n cannot be greater than 100",
crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 101, null));
long[] l = crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 0, null);
assertNotNull("n = 0 must return non-null", l);
assertEquals("n = 0 must return an empty list", 0, l.length);
assertTrue("10 blocks after 7 not returned correctly ",
Arrays.equals(new long[]{8,9,10,11,12,13,14,15,16,17},
crm.getCorruptReplicaBlockIdsForTesting(10, 7L)));
// Create a list of block ids. A list is used to allow easy
// validation of the output of getCorruptReplicaBlockIds.
final int blockCount = 140;
long[] replicaIds = new long[blockCount];
long[] stripedIds = new long[blockCount];
for (int i = 0; i < blockCount; i++) {
replicaIds[i] = getReplica(i).getBlockId();
stripedIds[i] = getStripedBlock(i).getBlockId();
}
DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
// Add to corrupt blocks map.
// Replicas
addToCorruptReplicasMap(crm, getReplica(0), dn1);
verifyCorruptBlocksCount(crm, 1, 0);
addToCorruptReplicasMap(crm, getReplica(1), dn1);
verifyCorruptBlocksCount(crm, 2, 0);
addToCorruptReplicasMap(crm, getReplica(1), dn2);
verifyCorruptBlocksCount(crm, 2, 0);
// Striped blocks
addToCorruptReplicasMap(crm, getStripedBlock(0), dn1);
verifyCorruptBlocksCount(crm, 2, 1);
addToCorruptReplicasMap(crm, getStripedBlock(1), dn1);
verifyCorruptBlocksCount(crm, 2, 2);
addToCorruptReplicasMap(crm, getStripedBlock(1), dn2);
verifyCorruptBlocksCount(crm, 2, 2);
// Remove from corrupt blocks map.
// Replicas
crm.removeFromCorruptReplicasMap(getReplica(1));
verifyCorruptBlocksCount(crm, 1, 2);
crm.removeFromCorruptReplicasMap(getReplica(0));
verifyCorruptBlocksCount(crm, 0, 2);
// Striped blocks
crm.removeFromCorruptReplicasMap(getStripedBlock(1));
verifyCorruptBlocksCount(crm, 0, 1);
crm.removeFromCorruptReplicasMap(getStripedBlock(0));
verifyCorruptBlocksCount(crm, 0, 0);
for (int blockId = 0; blockId < blockCount; blockId++) {
addToCorruptReplicasMap(crm, getReplica(blockId), dn1);
addToCorruptReplicasMap(crm, getStripedBlock(blockId), dn1);
}
assertEquals("Number of corrupt blocks not returning correctly",
2 * blockCount, crm.size());
assertTrue("First five corrupt replica blocks ids are not right!",
Arrays.equals(Arrays.copyOfRange(replicaIds, 0, 5),
crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 5, null)));
assertTrue("First five corrupt striped blocks ids are not right!",
Arrays.equals(Arrays.copyOfRange(stripedIds, 0, 5),
crm.getCorruptBlockIdsForTesting(BlockType.STRIPED, 5, null)));
assertTrue("10 replica blocks after 7 not returned correctly!",
Arrays.equals(Arrays.copyOfRange(replicaIds, 7, 17),
crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 10, 7L)));
assertTrue("10 striped blocks after 7 not returned correctly!",
Arrays.equals(Arrays.copyOfRange(stripedIds, 7, 17),
crm.getCorruptBlockIdsForTesting(BlockType.STRIPED,
10, getStripedBlock(7).getBlockId())));
}
private static void addToCorruptReplicasMap(CorruptReplicasMap crm,

View File

@ -45,9 +45,32 @@ public class TestLowRedundancyBlockQueues {
return sblk;
}
private void verifyBlockStats(LowRedundancyBlocks queues,
int lowRedundancyReplicaCount, int corruptReplicaCount,
int corruptReplicationOneCount, int lowRedundancyStripedCount,
int corruptStripedCount) {
assertEquals("Low redundancy replica count incorrect!",
lowRedundancyReplicaCount, queues.getLowRedundancyBlocksStat());
assertEquals("Corrupt replica count incorrect!",
corruptReplicaCount, queues.getCorruptBlocksStat());
assertEquals("Corrupt replica one count incorrect!",
corruptReplicationOneCount,
queues.getCorruptReplicationOneBlocksStat());
assertEquals("Low redundancy striped blocks count incorrect!",
lowRedundancyStripedCount, queues.getLowRedundancyECBlockGroupsStat());
assertEquals("Corrupt striped blocks count incorrect!",
corruptStripedCount, queues.getCorruptECBlockGroupsStat());
assertEquals("Low Redundancy count incorrect!",
lowRedundancyReplicaCount + lowRedundancyStripedCount,
queues.getLowRedundancyBlockCount());
assertEquals("LowRedundancyBlocks queue size incorrect!",
(lowRedundancyReplicaCount + corruptReplicaCount +
lowRedundancyStripedCount + corruptStripedCount), queues.size());
}
/**
* Test that adding blocks with different replication counts puts them
* into different queues
* into different queues.
* @throws Throwable if something goes wrong
*/
@Test
@ -59,43 +82,45 @@ public class TestLowRedundancyBlockQueues {
BlockInfo block_corrupt = genBlockInfo(4);
BlockInfo block_corrupt_repl_one = genBlockInfo(5);
//add a block with a single entry
// Add a block with a single entry
assertAdded(queues, block1, 1, 0, 3);
assertEquals(1, queues.getLowRedundancyBlockCount());
assertEquals(1, queues.size());
assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
//repeated additions fail
assertFalse(queues.add(block1, 1, 0, 0, 3));
verifyBlockStats(queues, 1, 0, 0, 0, 0);
//add a second block with two replicas
// Repeated additions fail
assertFalse(queues.add(block1, 1, 0, 0, 3));
verifyBlockStats(queues, 1, 0, 0, 0, 0);
// Add a second block with two replicas
assertAdded(queues, block2, 2, 0, 3);
assertEquals(2, queues.getLowRedundancyBlockCount());
assertEquals(2, queues.size());
assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
//now try to add a block that is corrupt
verifyBlockStats(queues, 2, 0, 0, 0, 0);
// Now try to add a block that is corrupt
assertAdded(queues, block_corrupt, 0, 0, 3);
assertEquals(3, queues.size());
assertEquals(2, queues.getLowRedundancyBlockCount());
assertEquals(1, queues.getCorruptBlockSize());
assertInLevel(queues, block_corrupt,
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
verifyBlockStats(queues, 2, 1, 0, 0, 0);
//insert a very insufficiently redundancy block
// Insert a very insufficiently redundancy block
assertAdded(queues, block_very_low_redundancy, 4, 0, 25);
assertInLevel(queues, block_very_low_redundancy,
LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
verifyBlockStats(queues, 3, 1, 0, 0, 0);
//insert a corrupt block with replication factor 1
// Insert a corrupt block with replication factor 1
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
assertEquals(2, queues.getCorruptBlockSize());
assertEquals(1, queues.getCorruptReplOneBlockSize());
verifyBlockStats(queues, 3, 2, 1, 0, 0);
// Bump up the expected count for corrupt replica one block from 1 to 3
queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
assertEquals(0, queues.getCorruptReplOneBlockSize());
verifyBlockStats(queues, 3, 2, 0, 0, 0);
// Reduce the expected replicas to 1
queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
assertEquals(1, queues.getCorruptReplOneBlockSize());
verifyBlockStats(queues, 3, 2, 1, 0, 0);
queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24);
assertEquals(2, queues.getCorruptReplOneBlockSize());
verifyBlockStats(queues, 2, 3, 2, 0, 0);
}
@Test
@ -131,16 +156,18 @@ public class TestLowRedundancyBlockQueues {
assertInLevel(queues, block,
LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
}
verifyBlockStats(queues, 0, 0, 0, numUR, 0);
}
// add a corrupted block
BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes);
assertEquals(numCorrupt, queues.getCorruptBlockSize());
verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt);
assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
numCorrupt++;
assertEquals(numUR + numCorrupt, queues.size());
assertEquals(numUR, queues.getLowRedundancyBlockCount());
assertEquals(numCorrupt, queues.getCorruptBlockSize());
verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt);
assertInLevel(queues, block_corrupt,
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}

View File

@ -37,38 +37,51 @@ import java.util.Iterator;
public class TestUnderReplicatedBlocks {
@Test(timeout=60000) // 1 min timeout
public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
@Test(timeout=120000) // 1 min timeout
public void testSetRepIncWithUnderReplicatedBlocks() throws Exception {
Configuration conf = new HdfsConfiguration();
final short REPLICATION_FACTOR = 2;
final String FILE_NAME = "/testFile";
final Path FILE_PATH = new Path(FILE_NAME);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR + 1).build();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
numDataNodes(REPLICATION_FACTOR + 1).build();
try {
// create a file with one block with a replication factor of 2
final FileSystem fs = cluster.getFileSystem();
final BlockManager bm = cluster.getNamesystem().getBlockManager();
DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
// remove one replica from the blocksMap so block becomes under-replicated
// but the block does not get put into the under-replicated blocks queue
final BlockManager bm = cluster.getNamesystem().getBlockManager();
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
DatanodeDescriptor dn = bm.blocksMap.getStorages(b.getLocalBlock())
.iterator().next().getDatanodeDescriptor();
bm.addToInvalidates(b.getLocalBlock(), dn);
// Compute the invalidate work in NN, and trigger the heartbeat from DN
BlockManagerTestUtil.computeAllPendingWork(bm);
DataNodeTestUtils.triggerHeartbeat(cluster.getDataNode(dn.getIpcPort()));
// Wait to make sure the DataNode receives the deletion request
Thread.sleep(5000);
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
// Remove the record from blocksMap
bm.blocksMap.removeNode(b.getLocalBlock(), dn);
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
// increment this file's replication factor
FsShell shell = new FsShell(conf);
assertEquals(0, shell.run(new String[]{
"-setrep", "-w", Integer.toString(1+REPLICATION_FACTOR), FILE_NAME}));
assertEquals(0, shell.run(new String[] {
"-setrep", "-w", Integer.toString(1 + REPLICATION_FACTOR),
FILE_NAME }));
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
@ -126,25 +139,30 @@ public class TestUnderReplicatedBlocks {
final BlockManager bm = cluster.getNamesystem().getBlockManager();
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
Iterator<DatanodeStorageInfo> storageInfos =
bm.blocksMap.getStorages(b.getLocalBlock())
.iterator();
bm.blocksMap.getStorages(b.getLocalBlock()).iterator();
DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
DatanodeDescriptor secondDn = storageInfos.next().getDatanodeDescriptor();
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
bm.getDatanodeManager().removeDatanode(firstDn);
BlockManagerTestUtil.updateState(bm);
assertEquals(NUM_OF_BLOCKS, bm.getUnderReplicatedNotMissingBlocks());
bm.computeDatanodeWork();
DFSTestUtil.verifyClientStats(conf, cluster);
bm.computeDatanodeWork();
assertTrue("The number of replication work pending before targets are " +
"determined should be non-negative.",
(Integer)Whitebox.getInternalState(secondDn,
"pendingReplicationWithoutTargets") >= 0);
BlockManagerTestUtil.updateState(bm);
assertTrue("The number of blocks to be replicated should be less than "
+ "or equal to " + bm.replicationStreamsHardLimit,
secondDn.getNumberOfBlocksToBeReplicated()
<= bm.replicationStreamsHardLimit);
DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}

View File

@ -199,7 +199,7 @@ public class TestReadOnlySharedStorage {
assertThat(numberReplicas.replicasOnStaleNodes(), is(0));
BlockManagerTestUtil.updateState(blockManager);
assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L));
assertThat(blockManager.getLowRedundancyBlocksCount(), is(0L));
assertThat(blockManager.getExcessBlocksCount(), is(0L));
}
@ -238,7 +238,7 @@ public class TestReadOnlySharedStorage {
// The block should be reported as under-replicated
BlockManagerTestUtil.updateState(blockManager);
assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L));
assertThat(blockManager.getLowRedundancyBlocksCount(), is(1L));
// The BlockManager should be able to heal the replication count back to 1
// by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas

View File

@ -118,7 +118,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
assertThat(cluster.getNameNode()
.getNamesystem()
.getBlockManager()
.getUnderReplicatedBlocksCount(),
.getLowRedundancyBlocksCount(),
is(0L));
}

View File

@ -424,7 +424,9 @@ public class TestAddStripedBlocks {
cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
// the total number of corrupted block info is still 1
Assert.assertEquals(1, ns.getCorruptECBlockGroupsStat());
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
Assert.assertEquals(0, ns.getCorruptBlocksStat());
// 2 internal blocks corrupted
Assert.assertEquals(2, bm.getCorruptReplicas(stored).size());

View File

@ -410,7 +410,7 @@ public class TestDecommissioningStatus {
// All nodes are dead and decommed. Blocks should be missing.
long missingBlocks = bm.getMissingBlocksCount();
long underreplicated = bm.getUnderReplicatedBlocksCount();
long underreplicated = bm.getLowRedundancyBlocksCount();
assertTrue(missingBlocks > 0);
assertTrue(underreplicated > 0);
@ -440,7 +440,7 @@ public class TestDecommissioningStatus {
// Blocks should be still be under-replicated
Thread.sleep(2000); // Let replication monitor run
assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount());
assertEquals(underreplicated, bm.getLowRedundancyBlocksCount());
// Start up a node.
LOG.info("Starting two more nodes");
@ -448,13 +448,13 @@ public class TestDecommissioningStatus {
cluster.waitActive();
// Replication should fix it.
int count = 0;
while((bm.getUnderReplicatedBlocksCount() > 0 ||
while((bm.getLowRedundancyBlocksCount() > 0 ||
bm.getPendingReconstructionBlocksCount() > 0) &&
count++ < 10) {
Thread.sleep(1000);
}
assertEquals(0, bm.getUnderReplicatedBlocksCount());
assertEquals(0, bm.getLowRedundancyBlocksCount());
assertEquals(0, bm.getPendingReconstructionBlocksCount());
assertEquals(0, bm.getMissingBlocksCount());

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -774,17 +775,24 @@ public class TestNameNodeMXBean {
}
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
ObjectName replStateMBeanName = new ObjectName(
"Hadoop:service=NameNode,name=ReplicatedBlocksState");
ObjectName ecBlkGrpStateMBeanName = new ObjectName(
"Hadoop:service=NameNode,name=ECBlockGroupsState");
ObjectName namenodeMXBeanName = new ObjectName(
"Hadoop:service=NameNode,name=NameNodeInfo");
// Wait for the metrics to discover the unrecoverable block group
long expectedMissingBlockCount = 1L;
long expectedCorruptBlockCount = 1L;
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
Long numMissingBlocks =
(Long) mbs.getAttribute(mxbeanName, "NumberOfMissingBlocks");
if (numMissingBlocks == 1L) {
(Long) mbs.getAttribute(namenodeMXBeanName,
"NumberOfMissingBlocks");
if (numMissingBlocks == expectedMissingBlockCount) {
return true;
}
} catch (Exception e) {
@ -794,7 +802,43 @@ public class TestNameNodeMXBean {
}
}, 1000, 60000);
String corruptFiles = (String) (mbs.getAttribute(mxbeanName,
BlockManagerTestUtil.updateState(
cluster.getNamesystem().getBlockManager());
// Verification of missing blocks
long totalMissingBlocks = cluster.getNamesystem().getMissingBlocksCount();
Long replicaMissingBlocks =
(Long) mbs.getAttribute(replStateMBeanName,
"MissingBlocksStat");
Long ecMissingBlocks =
(Long) mbs.getAttribute(ecBlkGrpStateMBeanName,
"MissingECBlockGroupsStat");
assertEquals("Unexpected total missing blocks!",
expectedMissingBlockCount, totalMissingBlocks);
assertEquals("Unexpected total missing blocks!",
totalMissingBlocks,
(replicaMissingBlocks + ecMissingBlocks));
assertEquals("Unexpected total ec missing blocks!",
expectedMissingBlockCount, ecMissingBlocks.longValue());
// Verification of corrupt blocks
long totalCorruptBlocks =
cluster.getNamesystem().getCorruptReplicaBlocks();
Long replicaCorruptBlocks =
(Long) mbs.getAttribute(replStateMBeanName,
"CorruptBlocksStat");
Long ecCorruptBlocks =
(Long) mbs.getAttribute(ecBlkGrpStateMBeanName,
"CorruptECBlockGroupsStat");
assertEquals("Unexpected total corrupt blocks!",
expectedCorruptBlockCount, totalCorruptBlocks);
assertEquals("Unexpected total corrupt blocks!",
totalCorruptBlocks,
(replicaCorruptBlocks + ecCorruptBlocks));
assertEquals("Unexpected total ec corrupt blocks!",
expectedCorruptBlockCount, ecCorruptBlocks.longValue());
String corruptFiles = (String) (mbs.getAttribute(namenodeMXBeanName,
"CorruptFiles"));
int numCorruptFiles = ((Object[]) JSON.parse(corruptFiles)).length;
assertEquals(1, numCorruptFiles);

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -50,6 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
@ -157,6 +159,8 @@ public class TestReconstructStripedBlocks {
assertEquals(numBlocks, missedNode.numBlocks());
bm.getDatanodeManager().removeDatanode(missedNode);
}
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
@ -185,6 +189,8 @@ public class TestReconstructStripedBlocks {
info.getSourceDnInfos().length);
}
}
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
@ -212,6 +218,8 @@ public class TestReconstructStripedBlocks {
final byte[] data = new byte[fileLen];
DFSTestUtil.writeFile(fs, p, data);
DFSTestUtil.waitForReplication(fs, p, groupSize, 5000);
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
LocatedStripedBlock lb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(p.toString(), 0).get(0);
@ -219,16 +227,20 @@ public class TestReconstructStripedBlocks {
cellSize, dataBlocks, parityBlocks);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(0, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(0, bm.getPendingReconstructionBlocksCount());
DFSTestUtil.verifyClientStats(conf, cluster);
// missing 1 block, so 1 task should be scheduled
DatanodeInfo dn0 = lbs[0].getLocations()[0];
cluster.stopDataNode(dn0.getName());
cluster.setDataNodeDead(dn0);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReconstructionBlocksCount());
DFSTestUtil.verifyClientStats(conf, cluster);
// missing another block, but no new task should be scheduled because
// previous task isn't finished.
@ -236,8 +248,10 @@ public class TestReconstructStripedBlocks {
cluster.stopDataNode(dn1.getName());
cluster.setDataNodeDead(dn1);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReconstructionBlocksCount());
DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
@ -294,6 +308,7 @@ public class TestReconstructStripedBlocks {
// bring the dn back: 10 internal blocks now
cluster.restartDataNode(dnProp);
cluster.waitActive();
DFSTestUtil.verifyClientStats(conf, cluster);
// stop another dn: 9 internal blocks, but only cover 8 real one
dnToStop = block.getLocations()[1];
@ -352,4 +367,72 @@ public class TestReconstructStripedBlocks {
cluster.shutdown();
}
}
@Test(timeout=120000) // 1 min timeout
public void testReconstructionWork() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1000);
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
5);
ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByID(
SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
conf.setStrings(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
policy.getName());
Path ecDir = new Path("/ec");
Path ecFilePath = new Path(ecDir, "ec-file");
int blockGroups = 2;
int totalDataNodes = policy.getNumDataUnits() +
policy.getNumParityUnits() + 1;
MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(
totalDataNodes).build();
try {
// create an EC file with 2 block groups
final DistributedFileSystem fs = dfsCluster.getFileSystem();
fs.mkdirs(ecDir);
fs.setErasureCodingPolicy(ecDir, policy.getName());
DFSTestUtil.createStripedFile(dfsCluster, ecFilePath, ecDir,
blockGroups, 2, false, policy);
final BlockManager bm = dfsCluster.getNamesystem().getBlockManager();
LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations(
ecFilePath.toString(), 0, blockGroups);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
Iterator<DatanodeStorageInfo> storageInfos =
bm.getStorages(bg.getBlock().getLocalBlock()).iterator();
DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, dfsCluster);
// Remove one of the DataUnit nodes
bm.getDatanodeManager().removeDatanode(firstDn);
// Verify low redundancy count matching EC block groups count
BlockManagerTestUtil.updateState(bm);
assertEquals(blockGroups, bm.getLowRedundancyECBlockGroupsStat());
DFSTestUtil.verifyClientStats(conf, dfsCluster);
// Trigger block group reconstruction
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
// Verify pending reconstruction count
assertEquals(blockGroups, getNumberOfBlocksToBeErasureCoded(dfsCluster));
assertEquals(0, bm.getLowRedundancyECBlockGroupsStat());
DFSTestUtil.verifyClientStats(conf, dfsCluster);
} finally {
dfsCluster.shutdown();
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystemTestWrapper;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
@ -57,8 +58,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@ -90,17 +95,23 @@ public class TestNameNodeMetrics {
new Path("/testNameNodeMetrics");
private static final String NN_METRICS = "NameNodeActivity";
private static final String NS_METRICS = "FSNamesystem";
private static final int BLOCK_SIZE = 1024 * 1024;
private static final ErasureCodingPolicy EC_POLICY =
SystemErasureCodingPolicies.getByID(
SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
public static final Log LOG = LogFactory.getLog(TestNameNodeMetrics.class);
// Number of datanodes in the cluster
private static final int DATANODE_COUNT = 3;
private static final int DATANODE_COUNT = EC_POLICY.getNumDataUnits() +
EC_POLICY.getNumParityUnits() + 1;
private static final int WAIT_GAUGE_VALUE_RETRIES = 20;
// Rollover interval of percentile metrics (in seconds)
private static final int PERCENTILES_INTERVAL = 1;
static {
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFS_REDUNDANCY_INTERVAL);
@ -109,7 +120,11 @@ public class TestNameNodeMetrics {
CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
"" + PERCENTILES_INTERVAL);
// Enable stale DataNodes checking
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
CONF.setBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
// Enable erasure coding
CONF.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
EC_POLICY.getName());
GenericTestUtils.setLogLevel(LogFactory.getLog(MetricsAsserts.class),
Level.DEBUG);
}
@ -119,18 +134,23 @@ public class TestNameNodeMetrics {
private final Random rand = new Random();
private FSNamesystem namesystem;
private BlockManager bm;
private Path ecDir;
private static Path getTestPath(String fileName) {
return new Path(TEST_ROOT_DIR_PATH, fileName);
}
@Before
public void setUp() throws Exception {
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT)
.build();
cluster.waitActive();
namesystem = cluster.getNamesystem();
bm = namesystem.getBlockManager();
fs = cluster.getFileSystem();
ecDir = getTestPath("/ec");
fs.mkdirs(ecDir);
fs.setErasureCodingPolicy(ecDir, EC_POLICY.getName());
}
@After
@ -219,49 +239,125 @@ public class TestNameNodeMetrics {
/** Test metrics associated with addition of a file */
@Test
public void testFileAdd() throws Exception {
// Add files with 100 blocks
final Path file = getTestPath("testFileAdd");
createFile(file, 3200, (short)3);
// File creations
final long blockCount = 32;
final Path normalFile = getTestPath("testFileAdd");
createFile(normalFile, blockCount * BLOCK_SIZE, (short)3);
final Path ecFile = new Path(ecDir, "ecFile.log");
DFSTestUtil.createStripedFile(cluster, ecFile, null, (int) blockCount, 1,
false, EC_POLICY);
int blockCapacity = namesystem.getBlockCapacity();
assertGauge("BlockCapacity", blockCapacity, getMetrics(NS_METRICS));
MetricsRecordBuilder rb = getMetrics(NN_METRICS);
// File create operations is 1
// Number of files created is depth of <code>file</code> path
assertCounter("CreateFileOps", 1L, rb);
assertCounter("FilesCreated", (long)file.depth(), rb);
// File create operations are 2
assertCounter("CreateFileOps", 2L, rb);
// Number of files created is depth of normalFile and ecFile, after
// removing the duplicate accounting for root test dir.
assertCounter("FilesCreated",
(long)(normalFile.depth() + ecFile.depth()), rb);
long filesTotal = file.depth() + 1; // Add 1 for root
long filesTotal = normalFile.depth() + ecFile.depth() + 1 /* ecDir */;
rb = getMetrics(NS_METRICS);
assertGauge("FilesTotal", filesTotal, rb);
assertGauge("BlocksTotal", blockCount, rb);
fs.delete(file, true);
assertGauge("BlocksTotal", blockCount * 2, rb);
fs.delete(normalFile, true);
filesTotal--; // reduce the filecount for deleted file
rb = waitForDnMetricValue(NS_METRICS, "FilesTotal", filesTotal);
assertGauge("BlocksTotal", blockCount, rb);
assertGauge("PendingDeletionBlocks", 0L, rb);
fs.delete(ecFile, true);
filesTotal--;
rb = waitForDnMetricValue(NS_METRICS, "FilesTotal", filesTotal);
assertGauge("BlocksTotal", 0L, rb);
assertGauge("PendingDeletionBlocks", 0L, rb);
rb = getMetrics(NN_METRICS);
// Delete file operations and number of files deleted must be 1
assertCounter("DeleteFileOps", 1L, rb);
assertCounter("FilesDeleted", 1L, rb);
assertCounter("DeleteFileOps", 2L, rb);
assertCounter("FilesDeleted", 2L, rb);
}
/**
* Verify low redundancy and corrupt blocks metrics are zero.
* @throws Exception
*/
private void verifyZeroMetrics() throws Exception {
BlockManagerTestUtil.updateState(bm);
MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS,
"CorruptBlocks", 0L, 500);
// Verify aggregated blocks metrics
assertGauge("UnderReplicatedBlocks", 0L, rb); // Deprecated metric
assertGauge("LowRedundancyBlocks", 0L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb); // Deprecated metric
assertGauge("PendingReconstructionBlocks", 0L, rb);
// Verify replica metrics
assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
assertGauge("CorruptReplicatedBlocks", 0L, rb);
// Verify striped block groups metrics
assertGauge("LowRedundancyECBlockGroups", 0L, rb);
assertGauge("CorruptECBlockGroups", 0L, rb);
}
/**
* Verify aggregated metrics equals the sum of replicated blocks metrics
* and erasure coded blocks metrics.
* @throws Exception
*/
private void verifyAggregatedMetricsTally() throws Exception {
BlockManagerTestUtil.updateState(bm);
assertEquals("Under replicated metrics not matching!",
namesystem.getLowRedundancyBlocks(),
namesystem.getUnderReplicatedBlocks());
assertEquals("Low redundancy metrics not matching!",
namesystem.getLowRedundancyBlocks(),
namesystem.getLowRedundancyBlocksStat() +
namesystem.getLowRedundancyECBlockGroupsStat());
assertEquals("Corrupt blocks metrics not matching!",
namesystem.getCorruptReplicaBlocks(),
namesystem.getCorruptBlocksStat() +
namesystem.getCorruptECBlockGroupsStat());
assertEquals("Missing blocks metrics not matching!",
namesystem.getMissingBlocksCount(),
namesystem.getMissingBlocksStat() +
namesystem.getMissingECBlockGroupsStat());
assertEquals("Missing blocks with replication factor one not matching!",
namesystem.getMissingReplOneBlocksCount(),
namesystem.getMissingReplicationOneBlocksStat());
assertEquals("Bytes in future blocks metrics not matching!",
namesystem.getBytesInFuture(),
namesystem.getBlocksBytesInFutureStat() +
namesystem.getECBlocksBytesInFutureStat());
assertEquals("Pending deletion blocks metrics not matching!",
namesystem.getPendingDeletionBlocks(),
namesystem.getPendingDeletionBlocksStat() +
namesystem.getPendingDeletionECBlockGroupsStat());
}
/** Corrupt a block and ensure metrics reflects it */
@Test
public void testCorruptBlock() throws Exception {
// Create a file with single block with two replicas
final Path file = getTestPath("testCorruptBlock");
createFile(file, 100, (short)2);
final short replicaCount = 2;
createFile(file, 100, replicaCount);
DFSTestUtil.waitForReplication(fs, file, replicaCount, 15000);
// Disable the heartbeats, so that no corrupted replica
// can be fixed
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
verifyZeroMetrics();
verifyAggregatedMetricsTally();
// Corrupt first replica of the block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
@ -272,12 +368,50 @@ public class TestNameNodeMetrics {
} finally {
cluster.getNamesystem().writeUnlock();
}
BlockManagerTestUtil.updateState(bm);
MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS,
"CorruptBlocks", 1L, 500);
// Verify aggregated blocks metrics
assertGauge("LowRedundancyBlocks", 1L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb);
assertGauge("PendingReconstructionBlocks", 0L, rb);
// Verify replicated blocks metrics
assertGauge("LowRedundancyReplicatedBlocks", 1L, rb);
assertGauge("CorruptReplicatedBlocks", 1L, rb);
// Verify striped blocks metrics
assertGauge("LowRedundancyECBlockGroups", 0L, rb);
assertGauge("CorruptECBlockGroups", 0L, rb);
verifyAggregatedMetricsTally();
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
// Start block reconstruction work
BlockManagerTestUtil.getComputedDatanodeWork(bm);
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 1L, rb);
assertGauge("PendingReplicationBlocks", 1L, rb);
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.waitForReplication(fs, file, replicaCount, 30000);
rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
// Verify aggregated blocks metrics
assertGauge("LowRedundancyBlocks", 0L, rb);
assertGauge("CorruptBlocks", 0L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb);
assertGauge("PendingReconstructionBlocks", 0L, rb);
// Verify replicated blocks metrics
assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
assertGauge("CorruptReplicatedBlocks", 0L, rb);
// Verify striped blocks metrics
assertGauge("LowRedundancyECBlockGroups", 0L, rb);
assertGauge("CorruptECBlockGroups", 0L, rb);
verifyAggregatedMetricsTally();
fs.delete(file, true);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
// During the file deletion, both BlockManager#corruptReplicas and
// BlockManager#pendingReplications will be updated, i.e., the records
// for the blocks of the deleted file will be removed from both
@ -287,11 +421,97 @@ public class TestNameNodeMetrics {
// BlockManager#updateState is called. And in
// BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks
// will also be updated.
rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L);
assertGauge("PendingReplicationBlocks", 0L, rb);
assertGauge("ScheduledReplicationBlocks", 0L, rb);
BlockManagerTestUtil.updateState(bm);
waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
verifyZeroMetrics();
verifyAggregatedMetricsTally();
}
@Test (timeout = 90000L)
public void testStripedFileCorruptBlocks() throws Exception {
final long fileLen = BLOCK_SIZE * 4;
final Path ecFile = new Path(ecDir, "ecFile.log");
DFSTestUtil.createFile(fs, ecFile, fileLen, (short) 1, 0L);
StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());
// Disable the heartbeats, so that no corrupted replica
// can be fixed
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
verifyZeroMetrics();
verifyAggregatedMetricsTally();
// Corrupt first replica of the block
LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations(
ecFile.toString(), 0, fileLen);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(bg.getBlock(), bg.getLocations()[0],
"STORAGE_ID", "TEST");
} finally {
cluster.getNamesystem().writeUnlock();
}
BlockManagerTestUtil.updateState(bm);
MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS,
"CorruptBlocks", 1L, 500);
// Verify aggregated blocks metrics
assertGauge("LowRedundancyBlocks", 1L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb);
assertGauge("PendingReconstructionBlocks", 0L, rb);
// Verify replica metrics
assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
assertGauge("CorruptReplicatedBlocks", 0L, rb);
// Verify striped block groups metrics
assertGauge("LowRedundancyECBlockGroups", 1L, rb);
assertGauge("CorruptECBlockGroups", 1L, rb);
verifyAggregatedMetricsTally();
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
// Start block reconstruction work
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
StripedFileTestUtil.waitForReconstructionFinished(ecFile, fs, 3);
rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
assertGauge("CorruptBlocks", 0L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb);
assertGauge("PendingReconstructionBlocks", 0L, rb);
// Verify replicated blocks metrics
assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
assertGauge("CorruptReplicatedBlocks", 0L, rb);
// Verify striped blocks metrics
assertGauge("LowRedundancyECBlockGroups", 0L, rb);
assertGauge("CorruptECBlockGroups", 0L, rb);
verifyAggregatedMetricsTally();
fs.delete(ecFile, true);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
// During the file deletion, both BlockManager#corruptReplicas and
// BlockManager#pendingReplications will be updated, i.e., the records
// for the blocks of the deleted file will be removed from both
// corruptReplicas and pendingReplications. The corresponding
// metrics (CorruptBlocks and PendingReplicationBlocks) will only be updated
// when BlockManager#computeDatanodeWork is run where the
// BlockManager#updateState is called. And in
// BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks
// will also be updated.
BlockManagerTestUtil.updateState(bm);
waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
verifyZeroMetrics();
verifyAggregatedMetricsTally();
}
/** Create excess blocks by reducing the replication factor for
* for a file and ensure metrics reflects it
*/
@ -340,7 +560,7 @@ public class TestNameNodeMetrics {
private void waitForDeletion() throws InterruptedException {
// Wait for more than DATANODE_COUNT replication intervals to ensure all
// the blocks pending deletion are sent for deletion to the datanodes.
Thread.sleep(DFS_REDUNDANCY_INTERVAL * (DATANODE_COUNT + 1) * 1000);
Thread.sleep(DFS_REDUNDANCY_INTERVAL * DATANODE_COUNT * 1000);
}
/**
@ -358,20 +578,25 @@ public class TestNameNodeMetrics {
* @throws Exception if something went wrong.
*/
private MetricsRecordBuilder waitForDnMetricValue(String source,
String name,
long expected)
throws Exception {
String name, long expected) throws Exception {
// initial wait
waitForDeletion();
return waitForDnMetricValue(source, name, expected,
DFS_REDUNDANCY_INTERVAL * 500);
}
private MetricsRecordBuilder waitForDnMetricValue(String source,
String name, long expected, long sleepInterval) throws Exception {
MetricsRecordBuilder rb;
long gauge;
//initial wait.
waitForDeletion();
//lots of retries are allowed for slow systems; fast ones will still
//exit early
int retries = (DATANODE_COUNT + 1) * WAIT_GAUGE_VALUE_RETRIES;
// Lots of retries are allowed for slow systems.
// Fast ones will still exit early.
int retries = DATANODE_COUNT * WAIT_GAUGE_VALUE_RETRIES;
rb = getMetrics(source);
gauge = MetricsAsserts.getLongGauge(name, rb);
while (gauge != expected && (--retries > 0)) {
Thread.sleep(DFS_REDUNDANCY_INTERVAL * 500);
Thread.sleep(sleepInterval);
BlockManagerTestUtil.updateState(bm);
rb = getMetrics(source);
gauge = MetricsAsserts.getLongGauge(name, rb);
}
@ -516,22 +741,22 @@ public class TestNameNodeMetrics {
getMetrics(NS_METRICS));
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 1L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 3L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 3L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 3L, getMetrics(NS_METRICS));
fs.mkdirs(new Path(TEST_ROOT_DIR_PATH, "/tmp"));
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 2L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 2L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 2L, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 4L, getMetrics(NS_METRICS));
cluster.getNameNodeRpc().rollEditLog();
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 6L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
@ -541,7 +766,7 @@ public class TestNameNodeMetrics {
long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
getMetrics(NS_METRICS));
assertTrue(lastCkptTime < newLastCkptTime);
assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 8L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
}
@ -554,10 +779,10 @@ public class TestNameNodeMetrics {
public void testSyncAndBlockReportMetric() throws Exception {
MetricsRecordBuilder rb = getMetrics(NN_METRICS);
// We have one sync when the cluster starts up, just opening the journal
assertCounter("SyncsNumOps", 1L, rb);
assertCounter("SyncsNumOps", 3L, rb);
// Each datanode reports in when the cluster comes up
assertCounter("BlockReportNumOps",
(long)DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb);
(long) DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb);
// Sleep for an interval+slop to let the percentiles rollover
Thread.sleep((PERCENTILES_INTERVAL+1)*1000);