HDFS-5378. In CacheReport, don't send genstamp and length on the wire (Contributed by Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1534334 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-10-21 19:29:46 +00:00
parent d61af97810
commit f9c08d02eb
20 changed files with 98 additions and 190 deletions

View File

@ -69,6 +69,9 @@ HDFS-4949 (Unreleased)
HDFS-5096. Automatically cache new data added to a cached path. HDFS-5096. Automatically cache new data added to a cached path.
(Contributed by Colin Patrick McCabe) (Contributed by Colin Patrick McCabe)
HDFS-5378. In CacheReport, don't send genstamp and length on the wire
(Contributed by Colin Patrick McCabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -21,8 +21,8 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -51,7 +51,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -156,8 +155,9 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
@Override @Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, CacheReport[] cacheReports, int xmitsInProgress, StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed,
int xceiverCount, int failedVolumes) throws IOException { int xmitsInProgress, int xceiverCount, int failedVolumes)
throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)) .setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@ -165,10 +165,12 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
for (StorageReport r : reports) { for (StorageReport r : reports) {
builder.addReports(PBHelper.convert(r)); builder.addReports(PBHelper.convert(r));
} }
for (CacheReport r : cacheReports) { if (dnCacheCapacity != 0) {
builder.addCacheReports(PBHelper.convert(r)); builder.setDnCacheCapacity(dnCacheCapacity);
}
if (dnCacheUsed != 0) {
builder.setDnCacheUsed(dnCacheUsed);
} }
HeartbeatResponseProto resp; HeartbeatResponseProto resp;
try { try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
@ -211,13 +213,13 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
@Override @Override
public DatanodeCommand cacheReport(DatanodeRegistration registration, public DatanodeCommand cacheReport(DatanodeRegistration registration,
String poolId, long[] blocks) throws IOException { String poolId, List<Long> blockIds) throws IOException {
CacheReportRequestProto.Builder builder = CacheReportRequestProto.Builder builder =
CacheReportRequestProto.newBuilder() CacheReportRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)) .setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId); .setBlockPoolId(poolId);
for (int i=0; i<blocks.length; i++) { for (Long blockId : blockIds) {
builder.addBlocks(blocks[i]); builder.addBlocks(blockId);
} }
CacheReportResponseProto resp; CacheReportResponseProto resp;

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
@ -48,7 +47,6 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -113,15 +111,9 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
p.getCapacity(), p.getDfsUsed(), p.getRemaining(), p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed()); p.getBlockPoolUsed());
} }
List<CacheReportProto> cacheList = request.getCacheReportsList();
CacheReport[] cacheReport = new CacheReport[list.size()];
i = 0;
for (CacheReportProto p : cacheList) {
cacheReport[i++] = new CacheReport(p.getCacheCapacity(),
p.getCacheUsed());
}
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report, cacheReport, request.getXmitsInProgress(), report, request.getDnCacheCapacity(), request.getDnCacheUsed(),
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes()); request.getXceiverCount(), request.getFailedVolumes());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
@ -179,7 +171,7 @@ public CacheReportResponseProto cacheReport(RpcController controller,
cmd = impl.cacheReport( cmd = impl.cacheReport(
PBHelper.convert(request.getRegistration()), PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), request.getBlockPoolId(),
Longs.toArray(request.getBlocksList())); request.getBlocksList());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }

View File

@ -59,7 +59,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto;
@ -125,7 +124,6 @@
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -1436,17 +1434,11 @@ private static State convert(StorageState state) {
} }
public static StorageReportProto convert(StorageReport r) { public static StorageReportProto convert(StorageReport r) {
return StorageReportProto.newBuilder() StorageReportProto.Builder builder = StorageReportProto.newBuilder()
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity()) .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining()) .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
.setStorageID(r.getStorageID()).build(); .setStorageID(r.getStorageID());
} return builder.build();
public static CacheReportProto convert(CacheReport r) {
return CacheReportProto.newBuilder()
.setCacheCapacity(r.getCapacity())
.setCacheUsed(r.getUsed())
.build();
} }
public static JournalInfo convert(JournalInfoProto info) { public static JournalInfo convert(JournalInfoProto info) {

View File

@ -24,6 +24,7 @@
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -38,7 +39,6 @@
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -435,7 +435,7 @@ DatanodeCommand blockReport() throws IOException {
DatanodeCommand cacheReport() throws IOException { DatanodeCommand cacheReport() throws IOException {
// If caching is disabled, do not send a cache report // If caching is disabled, do not send a cache report
if (dn.getFSDataset().getCacheCapacity() == 0) { if (dn.getFSDataset().getDnCacheCapacity() == 0) {
return null; return null;
} }
// send cache report if timer has expired. // send cache report if timer has expired.
@ -448,16 +448,15 @@ DatanodeCommand cacheReport() throws IOException {
lastCacheReport = startTime; lastCacheReport = startTime;
String bpid = bpos.getBlockPoolId(); String bpid = bpos.getBlockPoolId();
BlockListAsLongs blocks = dn.getFSDataset().getCacheReport(bpid); List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
long createTime = Time.monotonicNow(); long createTime = Time.monotonicNow();
cmd = bpNamenode.cacheReport(bpRegistration, bpid, cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
blocks.getBlockListAsLongs());
long sendTime = Time.monotonicNow(); long sendTime = Time.monotonicNow();
long createCost = createTime - startTime; long createCost = createTime - startTime;
long sendCost = sendTime - createTime; long sendCost = sendTime - createTime;
dn.getMetrics().addCacheReport(sendCost); dn.getMetrics().addCacheReport(sendCost);
LOG.info("CacheReport of " + blocks.getNumberOfBlocks() LOG.info("CacheReport of " + blockIds.size()
+ " blocks took " + createCost + " msec to generate and " + " blocks took " + createCost + " msec to generate and "
+ sendCost + " msecs for RPC and NN processing"); + sendCost + " msecs for RPC and NN processing");
} }
@ -475,10 +474,9 @@ HeartbeatResponse sendHeartBeat() throws IOException {
dn.getFSDataset().getDfsUsed(), dn.getFSDataset().getDfsUsed(),
dn.getFSDataset().getRemaining(), dn.getFSDataset().getRemaining(),
dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) }; dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
CacheReport[] cacheReport = { new CacheReport( return bpNamenode.sendHeartbeat(bpRegistration, report,
dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getDnCacheCapacity(),
dn.getFSDataset().getCacheUsed()) }; dn.getFSDataset().getDnCacheUsed(),
return bpNamenode.sendHeartbeat(bpRegistration, report, cacheReport,
dn.getXmitsInProgress(), dn.getXmitsInProgress(),
dn.getXceiverCount(), dn.getXceiverCount(),
dn.getFSDataset().getNumFailedVolumes()); dn.getFSDataset().getNumFailedVolumes());

View File

@ -270,12 +270,12 @@ public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
public BlockListAsLongs getBlockReport(String bpid); public BlockListAsLongs getBlockReport(String bpid);
/** /**
* Returns the cache report - the full list of cached blocks of a * Returns the cache report - the full list of cached block IDs of a
* block pool * block pool.
* @param bpid Block Pool Id * @param bpid Block Pool Id
* @return - the cache report - the full list of cached blocks * @return the cache report - the full list of cached block IDs.
*/ */
public BlockListAsLongs getCacheReport(String bpid); public List<Long> getCacheReport(String bpid);
/** Does the dataset contain the block? */ /** Does the dataset contain the block? */
public boolean contains(ExtendedBlock block); public boolean contains(ExtendedBlock block);

View File

@ -85,14 +85,14 @@ boolean isCached(String bpid, long blockId) {
* @return List of cached blocks suitable for translation into a * @return List of cached blocks suitable for translation into a
* {@link BlockListAsLongs} for a cache report. * {@link BlockListAsLongs} for a cache report.
*/ */
List<Block> getCachedBlocks(String bpid) { List<Long> getCachedBlocks(String bpid) {
List<Block> blocks = new ArrayList<Block>(); List<Long> blocks = new ArrayList<Long>();
// ConcurrentHashMap iteration doesn't see latest updates, which is okay // ConcurrentHashMap iteration doesn't see latest updates, which is okay
Iterator<MappableBlock> it = cachedBlocks.values().iterator(); Iterator<MappableBlock> it = cachedBlocks.values().iterator();
while (it.hasNext()) { while (it.hasNext()) {
MappableBlock mapBlock = it.next(); MappableBlock mapBlock = it.next();
if (mapBlock.getBlockPoolId().equals(bpid)) { if (mapBlock.getBlockPoolId().equals(bpid)) {
blocks.add(mapBlock.getBlock()); blocks.add(mapBlock.getBlock().getBlockId());
} }
} }
return blocks; return blocks;
@ -213,15 +213,11 @@ public void run() {
// Stats related methods for FsDatasetMBean // Stats related methods for FsDatasetMBean
public long getCacheUsed() { public long getDnCacheUsed() {
return usedBytes.get(); return usedBytes.get();
} }
public long getCacheCapacity() { public long getDnCacheCapacity() {
return maxBytes; return maxBytes;
} }
public long getCacheRemaining() {
return maxBytes - usedBytes.get();
}
} }

View File

@ -294,24 +294,16 @@ public int getNumFailedVolumes() {
* Returns the total cache used by the datanode (in bytes). * Returns the total cache used by the datanode (in bytes).
*/ */
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCacheUsed() { public long getDnCacheUsed() {
return cacheManager.getCacheUsed(); return cacheManager.getDnCacheUsed();
} }
/** /**
* Returns the total cache capacity of the datanode (in bytes). * Returns the total cache capacity of the datanode (in bytes).
*/ */
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCacheCapacity() { public long getDnCacheCapacity() {
return cacheManager.getCacheCapacity(); return cacheManager.getDnCacheCapacity();
}
/**
* Returns the total amount of cache remaining (in bytes).
*/
@Override // FSDatasetMBean
public long getCacheRemaining() {
return cacheManager.getCacheRemaining();
} }
/** /**
@ -1031,8 +1023,8 @@ public BlockListAsLongs getBlockReport(String bpid) {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public BlockListAsLongs getCacheReport(String bpid) { public List<Long> getCacheReport(String bpid) {
return new BlockListAsLongs(cacheManager.getCachedBlocks(bpid), null); return cacheManager.getCachedBlocks(bpid);
} }
/** /**

View File

@ -81,15 +81,10 @@ public interface FSDatasetMBean {
/** /**
* Returns the total cache used by the datanode (in bytes). * Returns the total cache used by the datanode (in bytes).
*/ */
public long getCacheUsed(); public long getDnCacheUsed();
/** /**
* Returns the total cache capacity of the datanode (in bytes). * Returns the total cache capacity of the datanode (in bytes).
*/ */
public long getCacheCapacity(); public long getDnCacheCapacity();
/**
* Returns the total amount of cache remaining (in bytes).
*/
public long getCacheRemaining();
} }

View File

@ -538,11 +538,11 @@ public void setCachedLocations(LocatedBlock block) {
} }
public final void processCacheReport(final DatanodeID datanodeID, public final void processCacheReport(final DatanodeID datanodeID,
final BlockListAsLongs report) throws IOException { final List<Long> blockIds) throws IOException {
if (!enabled) { if (!enabled) {
LOG.info("Ignoring cache report from " + datanodeID + LOG.info("Ignoring cache report from " + datanodeID +
" because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " + " because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " +
"number of blocks: " + report.getNumberOfBlocks()); "number of blocks: " + blockIds.size());
return; return;
} }
namesystem.writeLock(); namesystem.writeLock();
@ -555,7 +555,7 @@ public final void processCacheReport(final DatanodeID datanodeID,
throw new IOException( throw new IOException(
"processCacheReport from dead or unregistered datanode: " + datanode); "processCacheReport from dead or unregistered datanode: " + datanode);
} }
processCacheReportImpl(datanode, report); processCacheReportImpl(datanode, blockIds);
} finally { } finally {
endTime = Time.monotonicNow(); endTime = Time.monotonicNow();
namesystem.writeUnlock(); namesystem.writeUnlock();
@ -567,22 +567,16 @@ public final void processCacheReport(final DatanodeID datanodeID,
metrics.addCacheBlockReport((int) (endTime - startTime)); metrics.addCacheBlockReport((int) (endTime - startTime));
} }
LOG.info("Processed cache report from " LOG.info("Processed cache report from "
+ datanodeID + ", blocks: " + report.getNumberOfBlocks() + datanodeID + ", blocks: " + blockIds.size()
+ ", processing time: " + (endTime - startTime) + " msecs"); + ", processing time: " + (endTime - startTime) + " msecs");
} }
private void processCacheReportImpl(final DatanodeDescriptor datanode, private void processCacheReportImpl(final DatanodeDescriptor datanode,
final BlockListAsLongs report) { final List<Long> blockIds) {
CachedBlocksList cached = datanode.getCached(); CachedBlocksList cached = datanode.getCached();
cached.clear(); cached.clear();
BlockReportIterator itBR = report.getBlockReportIterator(); for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
while (itBR.hasNext()) { Block block = new Block(iter.next());
Block block = itBR.next();
ReplicaState iState = itBR.getCurrentReplicaState();
if (iState != ReplicaState.FINALIZED) {
LOG.error("Cached block report contained unfinalized block " + block);
continue;
}
BlockInfo blockInfo = blockManager.getStoredBlock(block); BlockInfo blockInfo = blockManager.getStoredBlock(block);
if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) { if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
// The NameNode will eventually remove or update the out-of-date block. // The NameNode will eventually remove or update the out-of-date block.
@ -591,6 +585,12 @@ private void processCacheReportImpl(final DatanodeDescriptor datanode,
block + ": expected genstamp " + blockInfo.getGenerationStamp()); block + ": expected genstamp " + blockInfo.getGenerationStamp());
continue; continue;
} }
if (!blockInfo.isComplete()) {
LOG.warn("Ignoring block id " + block.getBlockId() + ", because " +
"it is in not complete yet. It is in state " +
blockInfo.getBlockUCState());
continue;
}
Collection<DatanodeDescriptor> corruptReplicas = Collection<DatanodeDescriptor> corruptReplicas =
blockManager.getCorruptReplicas(blockInfo); blockManager.getCorruptReplicas(blockInfo);
if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) { if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) {

View File

@ -102,7 +102,6 @@
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -962,13 +961,14 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
@Override // DatanodeProtocol @Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, CacheReport[] cacheReport, int xmitsInProgress, StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xceiverCount, int failedVolumes) throws IOException { int xmitsInProgress, int xceiverCount,
int failedVolumes) throws IOException {
verifyRequest(nodeReg); verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(), return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(),
report[0].getDfsUsed(), report[0].getRemaining(), report[0].getDfsUsed(), report[0].getRemaining(),
report[0].getBlockPoolUsed(), cacheReport[0].getCapacity(), report[0].getBlockPoolUsed(), dnCacheCapacity, dnCacheUsed,
cacheReport[0].getUsed(), xceiverCount, xmitsInProgress, failedVolumes); xceiverCount, xmitsInProgress, failedVolumes);
} }
@Override // DatanodeProtocol @Override // DatanodeProtocol
@ -990,15 +990,13 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
@Override @Override
public DatanodeCommand cacheReport(DatanodeRegistration nodeReg, public DatanodeCommand cacheReport(DatanodeRegistration nodeReg,
String poolId, long[] blocks) throws IOException { String poolId, List<Long> blockIds) throws IOException {
verifyRequest(nodeReg); verifyRequest(nodeReg);
BlockListAsLongs blist = new BlockListAsLongs(blocks);
if (blockStateChangeLog.isDebugEnabled()) { if (blockStateChangeLog.isDebugEnabled()) {
blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: " blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
+ "from " + nodeReg + " " + blist.getNumberOfBlocks() + "from " + nodeReg + " " + blockIds.size() + " blocks");
+ " blocks");
} }
namesystem.getCacheManager().processCacheReport(nodeReg, blist); namesystem.getCacheManager().processCacheReport(nodeReg, blockIds);
return null; return null;
} }

View File

@ -1,44 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Utilization report for a Datanode cache
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CacheReport {
private final long capacity;
private final long used;
public CacheReport(long capacity, long used) {
this.capacity = capacity;
this.used = used;
}
public long getCapacity() {
return capacity;
}
public long getUsed() {
return used;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.protocol; package org.apache.hadoop.hdfs.server.protocol;
import java.io.*; import java.io.*;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -106,7 +107,8 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
@Idempotent @Idempotent
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, StorageReport[] reports,
CacheReport[] cacheReports, long dnCacheCapacity,
long dnCacheUsed,
int xmitsInProgress, int xmitsInProgress,
int xceiverCount, int xceiverCount,
int failedVolumes) throws IOException; int failedVolumes) throws IOException;
@ -139,16 +141,15 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])}, * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
* which is used to communicated blocks stored on disk. * which is used to communicated blocks stored on disk.
* *
* @param registration * @param The datanode registration.
* @param poolId block pool ID for the blocks * @param poolId The block pool ID for the blocks.
* @param blocks a Long[] array from {@link BlockListAsLongs} that describes * @param blockIds A list of block IDs.
* the list of cached blocks. This is more memory-efficient than a Block[]. * @return The DatanodeCommand.
* @return
* @throws IOException * @throws IOException
*/ */
@Idempotent @Idempotent
public DatanodeCommand cacheReport(DatanodeRegistration registration, public DatanodeCommand cacheReport(DatanodeRegistration registration,
String poolId, long[] blocks) throws IOException; String poolId, List<Long> blockIds) throws IOException;
/** /**
* blockReceivedAndDeleted() allows the DataNode to tell the NameNode about * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about

View File

@ -188,7 +188,8 @@ message HeartbeatRequestProto {
optional uint32 xmitsInProgress = 3 [ default = 0 ]; optional uint32 xmitsInProgress = 3 [ default = 0 ];
optional uint32 xceiverCount = 4 [ default = 0 ]; optional uint32 xceiverCount = 4 [ default = 0 ];
optional uint32 failedVolumes = 5 [ default = 0 ]; optional uint32 failedVolumes = 5 [ default = 0 ];
repeated CacheReportProto cacheReports = 6; optional uint64 dnCacheCapacity = 6 [ default = 0 ];
optional uint64 dnCacheUsed = 7 [default = 0 ];
} }
message StorageReportProto { message StorageReportProto {
@ -200,11 +201,6 @@ message StorageReportProto {
optional uint64 blockPoolUsed = 6 [ default = 0 ]; optional uint64 blockPoolUsed = 6 [ default = 0 ];
} }
message CacheReportProto {
optional uint64 cacheCapacity = 1 [default = 0 ];
optional uint64 cacheUsed = 2 [default = 0 ];
}
/** /**
* state - State the NN is in when returning response to the DN * state - State the NN is in when returning response to the DN
* txid - Highest transaction ID this NN has seen * txid - Highest transaction ID this NN has seen

View File

@ -24,6 +24,7 @@
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -466,8 +467,8 @@ public synchronized BlockListAsLongs getBlockReport(String bpid) {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public BlockListAsLongs getCacheReport(String bpid) { public List<Long> getCacheReport(String bpid) {
return new BlockListAsLongs(); return new LinkedList<Long>();
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
@ -496,17 +497,12 @@ public int getNumFailedVolumes() {
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCacheUsed() { public long getDnCacheUsed() {
return 0l; return 0l;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCacheCapacity() { public long getDnCacheCapacity() {
return 0l;
}
@Override // FSDatasetMBean
public long getCacheRemaining() {
return 0l; return 0l;
} }

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -127,7 +126,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
.when(mock).sendHeartbeat( .when(mock).sendHeartbeat(
Mockito.any(DatanodeRegistration.class), Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class), Mockito.any(StorageReport[].class),
Mockito.any(CacheReport[].class), Mockito.anyLong(),
Mockito.anyLong(),
Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt()); Mockito.anyInt());

View File

@ -67,7 +67,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -155,7 +154,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
when(namenode.sendHeartbeat( when(namenode.sendHeartbeat(
Mockito.any(DatanodeRegistration.class), Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class), Mockito.any(StorageReport[].class),
Mockito.any(CacheReport[].class), Mockito.anyLong(),
Mockito.anyLong(),
Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt())) Mockito.anyInt()))

View File

@ -19,6 +19,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -43,7 +44,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -109,8 +109,7 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds)
fsImage.getLastAppliedOrWrittenTxId())); fsImage.getLastAppliedOrWrittenTxId()));
doReturn(response).when(spyNN).sendHeartbeat( doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(), (DatanodeRegistration) any(),
(StorageReport[]) any(), (StorageReport[]) any(), anyLong(), anyLong(),
(CacheReport[]) any(),
anyInt(), anyInt(), anyInt()); anyInt(), anyInt(), anyInt());
} }
@ -166,15 +165,11 @@ private static long[] getBlockSizes(HdfsBlockLocation[] locs)
* Blocks until cache usage hits the expected new value. * Blocks until cache usage hits the expected new value.
*/ */
private long verifyExpectedCacheUsage(final long expected) throws Exception { private long verifyExpectedCacheUsage(final long expected) throws Exception {
long cacheUsed = fsd.getCacheUsed(); long cacheUsed = fsd.getDnCacheUsed();
while (cacheUsed != expected) { while (cacheUsed != expected) {
cacheUsed = fsd.getCacheUsed(); cacheUsed = fsd.getDnCacheUsed();
Thread.sleep(100); Thread.sleep(100);
} }
long cacheCapacity = fsd.getCacheCapacity();
long cacheRemaining = fsd.getCacheRemaining();
assertEquals("Sum of used and remaining cache does not equal total",
cacheCapacity, cacheUsed+cacheRemaining);
assertEquals("Unexpected amount of cache used", expected, cacheUsed); assertEquals("Unexpected amount of cache used", expected, cacheUsed);
return cacheUsed; return cacheUsed;
} }
@ -195,8 +190,8 @@ public void testCacheAndUncacheBlock() throws Exception {
final long[] blockSizes = getBlockSizes(locs); final long[] blockSizes = getBlockSizes(locs);
// Check initial state // Check initial state
final long cacheCapacity = fsd.getCacheCapacity(); final long cacheCapacity = fsd.getDnCacheCapacity();
long cacheUsed = fsd.getCacheUsed(); long cacheUsed = fsd.getDnCacheUsed();
long current = 0; long current = 0;
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
assertEquals("Unexpected amount of cache used", current, cacheUsed); assertEquals("Unexpected amount of cache used", current, cacheUsed);
@ -257,7 +252,7 @@ public void testFilesExceedMaxLockedMemory() throws Exception {
// Uncache the cached part of the nth file // Uncache the cached part of the nth file
setHeartbeatResponse(uncacheBlocks(fileLocs[numFiles-1])); setHeartbeatResponse(uncacheBlocks(fileLocs[numFiles-1]));
while (fsd.getCacheUsed() != oldCurrent) { while (fsd.getDnCacheUsed() != oldCurrent) {
Thread.sleep(100); Thread.sleep(100);
} }

View File

@ -845,9 +845,8 @@ void sendHeartbeat() throws IOException {
// TODO:FEDERATION currently a single block pool is supported // TODO:FEDERATION currently a single block pool is supported
StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(), StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
CacheReport[] cacheRep = { new CacheReport(0l, 0l) }; DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, 0L, 0L, 0, 0, 0).getCommands();
rep, cacheRep, 0, 0, 0).getCommands();
if(cmds != null) { if(cmds != null) {
for (DatanodeCommand cmd : cmds ) { for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -893,9 +892,8 @@ int replicateBlocks() throws IOException {
// register datanode // register datanode
StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(), StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
CacheReport[] cacheRep = { new CacheReport(0l, 0l) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
rep, cacheRep, 0, 0, 0).getCommands(); rep, 0L, 0L, 0, 0, 0).getCommands();
if (cmds != null) { if (cmds != null) {
for (DatanodeCommand cmd : cmds) { for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.CacheReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -143,8 +142,7 @@ public void testDeadDatanode() throws Exception {
// that asks datanode to register again // that asks datanode to register again
StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0, StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0,
0, 0) }; 0, 0) };
CacheReport[] cacheRep = { new CacheReport(0l, 0l) }; DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0)
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, cacheRep, 0, 0, 0)
.getCommands(); .getCommands();
assertEquals(1, cmd.length); assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER