HDFS-9038. DFS reserved space is erroneously counted towards non-DFS used. (Brahma Reddy Battula)

This commit is contained in:
Arpit Agarwal 2016-09-06 13:37:21 -07:00
parent f6c0b7543f
commit 5f23abfa30
19 changed files with 219 additions and 42 deletions

View File

@ -43,6 +43,7 @@
public class DatanodeInfo extends DatanodeID implements Node {
private long capacity;
private long dfsUsed;
private long nonDfsUsed;
private long remaining;
private long blockPoolUsed;
private long cacheCapacity;
@ -89,6 +90,7 @@ public DatanodeInfo(DatanodeInfo from) {
super(from);
this.capacity = from.getCapacity();
this.dfsUsed = from.getDfsUsed();
this.nonDfsUsed = from.getNonDfsUsed();
this.remaining = from.getRemaining();
this.blockPoolUsed = from.getBlockPoolUsed();
this.cacheCapacity = from.getCacheCapacity();
@ -105,6 +107,7 @@ public DatanodeInfo(DatanodeID nodeID) {
super(nodeID);
this.capacity = 0L;
this.dfsUsed = 0L;
this.nonDfsUsed = 0L;
this.remaining = 0L;
this.blockPoolUsed = 0L;
this.cacheCapacity = 0L;
@ -158,10 +161,26 @@ public DatanodeInfo(final String ipAddr, final String hostName,
final int xceiverCount, final String networkLocation,
final AdminStates adminState,
final String upgradeDomain) {
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
infoSecurePort, ipcPort);
this(ipAddr, hostName, datanodeUuid, xferPort, infoPort, infoSecurePort,
ipcPort, capacity, dfsUsed, 0L, remaining, blockPoolUsed,
cacheCapacity, cacheUsed, lastUpdate, lastUpdateMonotonic,
xceiverCount, networkLocation, adminState, upgradeDomain);
}
/** Constructor. */
public DatanodeInfo(final String ipAddr, final String hostName,
final String datanodeUuid, final int xferPort, final int infoPort,
final int infoSecurePort, final int ipcPort, final long capacity,
final long dfsUsed, final long nonDfsUsed, final long remaining,
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
final long lastUpdate, final long lastUpdateMonotonic,
final int xceiverCount, final String networkLocation,
final AdminStates adminState, final String upgradeDomain) {
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort, infoSecurePort,
ipcPort);
this.capacity = capacity;
this.dfsUsed = dfsUsed;
this.nonDfsUsed = nonDfsUsed;
this.remaining = remaining;
this.blockPoolUsed = blockPoolUsed;
this.cacheCapacity = cacheCapacity;
@ -174,7 +193,7 @@ public DatanodeInfo(final String ipAddr, final String hostName,
this.upgradeDomain = upgradeDomain;
}
/** Network location name */
/** Network location name. */
@Override
public String getName() {
return getXferAddr();
@ -191,8 +210,7 @@ public String getName() {
/** The used space by the data node. */
public long getNonDfsUsed() {
long nonDFSUsed = capacity - dfsUsed - remaining;
return nonDFSUsed < 0 ? 0 : nonDFSUsed;
return nonDfsUsed;
}
/** The used space by the data node as percentage of present capacity */
@ -282,6 +300,11 @@ public void setDfsUsed(long dfsUsed) {
this.dfsUsed = dfsUsed;
}
/** Sets the nondfs-used space for the datanode. */
public void setNonDfsUsed(long nonDfsUsed) {
this.nonDfsUsed = nonDfsUsed;
}
/** Sets raw free space. */
public void setRemaining(long remaining) {
this.remaining = remaining;

View File

@ -289,6 +289,7 @@ public static DatanodeInfoProto convert(DatanodeInfo info) {
.setId(convert((DatanodeID) info))
.setCapacity(info.getCapacity())
.setDfsUsed(info.getDfsUsed())
.setNonDfsUsed(info.getNonDfsUsed())
.setRemaining(info.getRemaining())
.setBlockPoolUsed(info.getBlockPoolUsed())
.setCacheCapacity(info.getCacheCapacity())
@ -581,15 +582,24 @@ static public Token<BlockTokenIdentifier>[] convertTokens(
}
static public DatanodeInfo convert(DatanodeInfoProto di) {
if (di == null) return null;
return new DatanodeInfo(
convert(di.getId()),
di.hasLocation() ? di.getLocation() : null,
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
di.getLastUpdate(), di.getLastUpdateMonotonic(),
di.getXceiverCount(), convert(di.getAdminState()),
if (di == null) {
return null;
}
DatanodeInfo dinfo = new DatanodeInfo(convert(di.getId()),
di.hasLocation() ? di.getLocation() : null, di.getCapacity(),
di.getDfsUsed(), di.getRemaining(), di.getBlockPoolUsed(),
di.getCacheCapacity(), di.getCacheUsed(), di.getLastUpdate(),
di.getLastUpdateMonotonic(), di.getXceiverCount(),
convert(di.getAdminState()),
di.hasUpgradeDomain() ? di.getUpgradeDomain() : null);
if (di.hasNonDfsUsed()) {
dinfo.setNonDfsUsed(di.getNonDfsUsed());
} else {
// use the legacy way for older datanodes
long nonDFSUsed = di.getCapacity() - di.getDfsUsed() - di.getRemaining();
dinfo.setNonDfsUsed(nonDFSUsed < 0 ? 0 : nonDFSUsed);
}
return dinfo;
}
public static StorageType[] convertStorageTypes(
@ -1556,12 +1566,12 @@ public static StorageReport[] convertStorageReports(
}
public static StorageReport convert(StorageReportProto p) {
return new StorageReport(
p.hasStorage() ?
convert(p.getStorage()) :
new DatanodeStorage(p.getStorageUuid()),
p.getFailed(), p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed());
long nonDfsUsed = p.hasNonDfsUsed() ? p.getNonDfsUsed() : p.getCapacity()
- p.getDfsUsed() - p.getRemaining();
return new StorageReport(p.hasStorage() ? convert(p.getStorage())
: new DatanodeStorage(p.getStorageUuid()), p.getFailed(),
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed(), nonDfsUsed);
}
public static DatanodeStorage convert(DatanodeStorageProto s) {
@ -2136,7 +2146,8 @@ public static StorageReportProto convert(StorageReport r) {
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
.setStorageUuid(r.getStorage().getStorageID())
.setStorage(convert(r.getStorage()));
.setStorage(convert(r.getStorage()))
.setNonDfsUsed(r.getNonDfsUsed());
return builder.build();
}

View File

@ -25,17 +25,19 @@ public class StorageReport {
private final boolean failed;
private final long capacity;
private final long dfsUsed;
private final long nonDfsUsed;
private final long remaining;
private final long blockPoolUsed;
public static final StorageReport[] EMPTY_ARRAY = {};
public StorageReport(DatanodeStorage storage, boolean failed,
long capacity, long dfsUsed, long remaining, long bpUsed) {
public StorageReport(DatanodeStorage storage, boolean failed, long capacity,
long dfsUsed, long remaining, long bpUsed, long nonDfsUsed) {
this.storage = storage;
this.failed = failed;
this.capacity = capacity;
this.dfsUsed = dfsUsed;
this.nonDfsUsed = nonDfsUsed;
this.remaining = remaining;
this.blockPoolUsed = bpUsed;
}
@ -56,6 +58,10 @@ public long getDfsUsed() {
return dfsUsed;
}
public long getNonDfsUsed() {
return nonDfsUsed;
}
public long getRemaining() {
return remaining;
}

View File

@ -101,6 +101,7 @@ message DatanodeInfoProto {
optional uint64 cacheUsed = 12 [default = 0];
optional uint64 lastUpdateMonotonic = 13 [default = 0];
optional string upgradeDomain = 14;
optional uint64 nonDfsUsed = 15;
}
/**
@ -125,6 +126,7 @@ message StorageReportProto {
optional uint64 remaining = 5 [ default = 0 ];
optional uint64 blockPoolUsed = 6 [ default = 0 ];
optional DatanodeStorageProto storage = 7; // supersedes StorageUuid
optional uint64 nonDfsUsed = 8;
}
/**

View File

@ -417,6 +417,7 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
long totalRemaining = 0;
long totalBlockPoolUsed = 0;
long totalDfsUsed = 0;
long totalNonDfsUsed = 0;
Set<DatanodeStorageInfo> failedStorageInfos = null;
// Decide if we should check for any missing StorageReport and mark it as
@ -477,6 +478,7 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
totalRemaining += report.getRemaining();
totalBlockPoolUsed += report.getBlockPoolUsed();
totalDfsUsed += report.getDfsUsed();
totalNonDfsUsed += report.getNonDfsUsed();
}
rollBlocksScheduled(getLastUpdateMonotonic());
@ -485,6 +487,7 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
setRemaining(totalRemaining);
setBlockPoolUsed(totalBlockPoolUsed);
setDfsUsed(totalDfsUsed);
setNonDfsUsed(totalNonDfsUsed);
if (checkFailedStorages) {
updateFailedStorage(failedStorageInfos);
}

View File

@ -35,6 +35,7 @@ class DatanodeStats {
private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
private long capacityTotal = 0L;
private long capacityUsed = 0L;
private long capacityUsedNonDfs = 0L;
private long capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int xceiverCount = 0;
@ -49,6 +50,7 @@ synchronized void add(final DatanodeDescriptor node) {
xceiverCount += node.getXceiverCount();
if (node.isInService()) {
capacityUsed += node.getDfsUsed();
capacityUsedNonDfs += node.getNonDfsUsed();
blockPoolUsed += node.getBlockPoolUsed();
nodesInService++;
nodesInServiceXceiverCount += node.getXceiverCount();
@ -77,6 +79,7 @@ synchronized void subtract(final DatanodeDescriptor node) {
xceiverCount -= node.getXceiverCount();
if (node.isInService()) {
capacityUsed -= node.getDfsUsed();
capacityUsedNonDfs -= node.getNonDfsUsed();
blockPoolUsed -= node.getBlockPoolUsed();
nodesInService--;
nodesInServiceXceiverCount -= node.getXceiverCount();
@ -159,8 +162,7 @@ synchronized float getPercentBlockPoolUsed() {
}
synchronized long getCapacityUsedNonDFS() {
final long nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
return nonDFSUsed < 0L? 0L : nonDFSUsed;
return capacityUsedNonDfs;
}
synchronized float getCapacityUsedPercent() {

View File

@ -93,6 +93,7 @@ public void updateFromStorage(DatanodeStorage storage) {
private long capacity;
private long dfsUsed;
private long nonDfsUsed;
private volatile long remaining;
private long blockPoolUsed;
@ -202,6 +203,10 @@ long getDfsUsed() {
return dfsUsed;
}
long getNonDfsUsed() {
return nonDfsUsed;
}
long getRemaining() {
return remaining;
}
@ -283,6 +288,7 @@ Iterator<BlockInfo> getBlockIterator() {
void updateState(StorageReport r) {
capacity = r.getCapacity();
dfsUsed = r.getDfsUsed();
nonDfsUsed = r.getNonDfsUsed();
remaining = r.getRemaining();
blockPoolUsed = r.getBlockPoolUsed();
}
@ -332,7 +338,7 @@ public String toString() {
StorageReport toStorageReport() {
return new StorageReport(
new DatanodeStorage(storageID, state, storageType),
false, capacity, dfsUsed, remaining, blockPoolUsed);
false, capacity, dfsUsed, remaining, blockPoolUsed, nonDfsUsed);
}
/**

View File

@ -164,7 +164,8 @@ public StorageReport[] getStorageReports(String bpid)
volume.getCapacity(),
volume.getDfsUsed(),
volume.getAvailable(),
volume.getBlockPoolUsed(bpid));
volume.getBlockPoolUsed(bpid),
volume.getNonDfsUsed());
reports.add(sr);
} catch (ClosedChannelException e) {
continue;

View File

@ -385,15 +385,46 @@ public void setCapacityForTesting(long capacity) {
*/
@Override
public long getAvailable() throws IOException {
long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get();
long available = usage.getAvailable() - reserved
- reservedForReplicas.get();
long remaining = getCapacity() - getDfsUsed() - getReservedForReplicas();
long available = usage.getAvailable() - getRemainingReserved()
- getReservedForReplicas();
if (remaining > available) {
remaining = available;
}
return (remaining > 0) ? remaining : 0;
}
long getActualNonDfsUsed() throws IOException {
return usage.getUsed() - getDfsUsed();
}
private long getRemainingReserved() throws IOException {
long actualNonDfsUsed = getActualNonDfsUsed();
if (actualNonDfsUsed < reserved) {
return reserved - actualNonDfsUsed;
}
return 0L;
}
/**
* Unplanned Non-DFS usage, i.e. Extra usage beyond reserved.
*
* @return
* @throws IOException
*/
public long getNonDfsUsed() throws IOException {
long actualNonDfsUsed = getActualNonDfsUsed();
if (actualNonDfsUsed < reserved) {
return 0L;
}
return actualNonDfsUsed - reserved;
}
@VisibleForTesting
long getDfAvailable() {
return usage.getAvailable();
}
@VisibleForTesting
public long getReservedForReplicas() {
return reservedForReplicas.get();

View File

@ -720,6 +720,28 @@ public void testBlockECRecoveryCommand() {
assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next());
}
@Test
public void testDataNodeInfoPBHelper() {
DatanodeID id = DFSTestUtil.getLocalDatanodeID();
DatanodeInfo dnInfos0 = new DatanodeInfo(id);
dnInfos0.setCapacity(3500L);
dnInfos0.setDfsUsed(1000L);
dnInfos0.setNonDfsUsed(2000L);
dnInfos0.setRemaining(500L);
HdfsProtos.DatanodeInfoProto dnproto = PBHelperClient.convert(dnInfos0);
DatanodeInfo dnInfos1 = PBHelperClient.convert(dnproto);
compare(dnInfos0, dnInfos1);
assertEquals(dnInfos0.getNonDfsUsed(), dnInfos1.getNonDfsUsed());
//Testing without nonDfs field
HdfsProtos.DatanodeInfoProto.Builder b =
HdfsProtos.DatanodeInfoProto.newBuilder();
b.setId(PBHelperClient.convert(id)).setCapacity(3500L).setDfsUsed(1000L)
.setRemaining(500L);
DatanodeInfo dnInfos3 = PBHelperClient.convert(b.build());
assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed());
}
private void assertBlockECRecoveryInfoEquals(
BlockECReconstructionInfo blkECRecoveryInfo1,
BlockECReconstructionInfo blkECRecoveryInfo2) {
@ -768,4 +790,6 @@ private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,
compare(dnInfos1[i], dnInfos2[i]);
}
}
}

View File

@ -294,7 +294,7 @@ public static StorageReport[] getStorageReportsForDatanode(
StorageReport report = new StorageReport(
dns ,false, storage.getCapacity(),
storage.getDfsUsed(), storage.getRemaining(),
storage.getBlockPoolUsed());
storage.getBlockPoolUsed(), 0);
reports.add(report);
}
return reports.toArray(StorageReport.EMPTY_ARRAY);

View File

@ -1200,7 +1200,7 @@ public void testBlockManagerMachinesArray() throws Exception {
StorageReport report = new StorageReport(
dns, true, storageInfo.getCapacity(),
storageInfo.getDfsUsed(), storageInfo.getRemaining(),
storageInfo.getBlockPoolUsed());
storageInfo.getBlockPoolUsed(), 0L);
reports.add(report);
break;
}

View File

@ -441,7 +441,7 @@ DatanodeStorage getDnStorage() {
synchronized StorageReport getStorageReport(String bpid) {
return new StorageReport(dnStorage,
false, getCapacity(), getUsed(), getFree(),
map.get(bpid).getUsed());
map.get(bpid).getUsed(), 0L);
}
}

View File

@ -72,7 +72,7 @@ public DatanodeStorage getStorage(String storageUuid) {
@Override
public StorageReport[] getStorageReports(String bpid) throws IOException {
StorageReport[] result = new StorageReport[1];
result[0] = new StorageReport(storage, false, 0, 0, 0, 0);
result[0] = new StorageReport(storage, false, 0, 0, 0, 0, 0);
return result;
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@ -177,4 +178,43 @@ public void testDfsReservedForDifferentStorageTypes() throws IOException {
conf, StorageType.DEFAULT);
assertEquals("", 100L, volume4.getReserved());
}
@Test
public void testNonDfsUsedMetricForVolume() throws Exception {
File volDir = new File(baseDir, "volume-0");
volDir.mkdirs();
/*
* Lets have the example.
* Capacity - 1000
* Reserved - 100
* DfsUsed - 200
* Actual Non-DfsUsed - 300 -->(expected)
* ReservedForReplicas - 50
*/
long diskCapacity = 1000L;
long duReserved = 100L;
long dfsUsage = 200L;
long actualNonDfsUsage = 300L;
long reservedForReplicas = 50L;
conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, duReserved);
FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf,
StorageType.DEFAULT);
FsVolumeImpl spyVolume = Mockito.spy(volume);
// Set Capacity for testing
long testCapacity = diskCapacity - duReserved;
spyVolume.setCapacityForTesting(testCapacity);
// Mock volume.getDfAvailable()
long dfAvailable = diskCapacity - dfsUsage - actualNonDfsUsage;
Mockito.doReturn(dfAvailable).when(spyVolume).getDfAvailable();
// Mock dfsUsage
Mockito.doReturn(dfsUsage).when(spyVolume).getDfsUsed();
// Mock reservedForReplcas
Mockito.doReturn(reservedForReplicas).when(spyVolume)
.getReservedForReplicas();
Mockito.doReturn(actualNonDfsUsage).when(spyVolume)
.getActualNonDfsUsed();
long expectedNonDfsUsage =
actualNonDfsUsage - duReserved;
assertEquals(expectedNonDfsUsage, spyVolume.getNonDfsUsed());
}
}

View File

@ -949,7 +949,7 @@ void sendHeartbeat() throws IOException {
// register datanode
// TODO:FEDERATION currently a single block pool is supported
StorageReport[] rep = { new StorageReport(storage, false,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null, true).getCommands();
if(cmds != null) {
@ -998,7 +998,7 @@ public int compareTo(String xferAddr) {
int replicateBlocks() throws IOException {
// register datanode
StorageReport[] rep = { new StorageReport(storage,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
if (cmds != null) {

View File

@ -128,7 +128,7 @@ public void testDeadDatanode() throws Exception {
// that asks datanode to register again
StorageReport[] rep = { new StorageReport(
new DatanodeStorage(reg.getDatanodeUuid()),
false, 0, 0, 0, 0) };
false, 0, 0, 0, 0, 0) };
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
assertEquals(1, cmd.length);

View File

@ -71,7 +71,6 @@ public void testVolumeSize() throws Exception {
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
@ -100,8 +99,9 @@ public void testVolumeSize() throws Exception {
+ " used " + used + " non DFS used " + nonDFSUsed
+ " remaining " + remaining + " perentUsed " + percentUsed
+ " percentRemaining " + percentRemaining);
assertTrue(configCapacity == (used + remaining + nonDFSUsed));
// There will be 5% space reserved in ext filesystem which is not
// considered.
assertTrue(configCapacity >= (used + remaining + nonDFSUsed));
assertTrue(percentUsed == DFSUtilClient.getPercentUsed(used,
configCapacity));
assertTrue(percentRemaining == DFSUtilClient.getPercentRemaining(
@ -148,7 +148,9 @@ public void testVolumeSize() throws Exception {
assertTrue(configCapacity == diskCapacity - reserved);
// Ensure new total capacity reported excludes the reserved space
assertTrue(configCapacity == (used + remaining + nonDFSUsed));
// There will be 5% space reserved in ext filesystem which is not
// considered.
assertTrue(configCapacity >= (used + remaining + nonDFSUsed));
// Ensure percent used is calculated based on used and present capacity
assertTrue(percentUsed == DFSUtilClient.getPercentUsed(used,
@ -160,9 +162,33 @@ public void testVolumeSize() throws Exception {
// Ensure percent used is calculated based on used and present capacity
assertTrue(percentRemaining == ((float)remaining * 100.0f)/(float)configCapacity);
//Adding testcase for non-dfs used where we need to consider
// reserved replica also.
final int fileCount = 5;
final DistributedFileSystem fs = cluster.getFileSystem();
// create streams and hsync to force datastreamers to start
DFSOutputStream[] streams = new DFSOutputStream[fileCount];
for (int i=0; i < fileCount; i++) {
streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i))
.getWrappedStream();
streams[i].write("1".getBytes());
streams[i].hsync();
}
triggerHeartbeats(cluster.getDataNodes());
assertTrue(configCapacity > (namesystem.getCapacityUsed() + namesystem
.getCapacityRemaining() + namesystem.getNonDfsUsedSpace()));
// There is a chance that nonDFS usage might have slightly due to
// testlogs, So assume 1MB other files used within this gap
assertTrue(
(namesystem.getCapacityUsed() + namesystem.getCapacityRemaining()
+ namesystem.getNonDfsUsedSpace() + fileCount * fs
.getDefaultBlockSize()) - configCapacity < 1 * 1024);
}
finally {
if (cluster != null) {cluster.shutdown();}
if (cluster != null) {
cluster.shutdown();
}
}
}

View File

@ -166,7 +166,9 @@ public void testCapacityMetrics() throws Exception {
MetricsAsserts.getLongGauge("CapacityRemaining", rb);
long capacityUsedNonDFS =
MetricsAsserts.getLongGauge("CapacityUsedNonDFS", rb);
assert(capacityUsed + capacityRemaining + capacityUsedNonDFS ==
// There will be 5% space reserved in ext filesystem which is not
// considered.
assert (capacityUsed + capacityRemaining + capacityUsedNonDFS <=
capacityTotal);
}