From f9c08d02ebe4a5477cf5d753f0d9d48fc6f9fa48 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Mon, 21 Oct 2013 19:29:46 +0000 Subject: [PATCH] HDFS-5378. In CacheReport, don't send genstamp and length on the wire (Contributed by Colin Patrick McCabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1534334 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES-HDFS-4949.txt | 3 ++ ...atanodeProtocolClientSideTranslatorPB.java | 22 +++++----- ...atanodeProtocolServerSideTranslatorPB.java | 14 ++---- .../hadoop/hdfs/protocolPB/PBHelper.java | 14 ++---- .../hdfs/server/datanode/BPServiceActor.java | 18 ++++---- .../datanode/fsdataset/FsDatasetSpi.java | 10 ++--- .../fsdataset/impl/FsDatasetCache.java | 14 +++--- .../fsdataset/impl/FsDatasetImpl.java | 20 +++------ .../datanode/metrics/FSDatasetMBean.java | 9 +--- .../hdfs/server/namenode/CacheManager.java | 26 +++++------ .../server/namenode/NameNodeRpcServer.java | 18 ++++---- .../hdfs/server/protocol/CacheReport.java | 44 ------------------- .../server/protocol/DatanodeProtocol.java | 15 ++++--- .../src/main/proto/DatanodeProtocol.proto | 8 +--- .../server/datanode/SimulatedFSDataset.java | 14 +++--- .../server/datanode/TestBPOfferService.java | 4 +- .../server/datanode/TestBlockRecovery.java | 4 +- .../server/datanode/TestFsDatasetCache.java | 19 +++----- .../namenode/NNThroughputBenchmark.java | 8 ++-- .../server/namenode/TestDeadDatanode.java | 4 +- 20 files changed, 98 insertions(+), 190 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/CacheReport.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index 4c159ba6dd..5da36d0270 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -69,6 +69,9 @@ HDFS-4949 (Unreleased) HDFS-5096. Automatically cache new data added to a cached path. (Contributed by Colin Patrick McCabe) + HDFS-5378. In CacheReport, don't send genstamp and length on the wire + (Contributed by Colin Patrick McCabe) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 1578d24e90..f12a92ff0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -21,8 +21,8 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -51,7 +51,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -156,8 +155,9 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration @Override public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, - StorageReport[] reports, CacheReport[] cacheReports, int xmitsInProgress, - int xceiverCount, int failedVolumes) throws IOException { + StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed, + int xmitsInProgress, int xceiverCount, int failedVolumes) + throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -165,10 +165,12 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, for (StorageReport r : reports) { builder.addReports(PBHelper.convert(r)); } - for (CacheReport r : cacheReports) { - builder.addCacheReports(PBHelper.convert(r)); + if (dnCacheCapacity != 0) { + builder.setDnCacheCapacity(dnCacheCapacity); + } + if (dnCacheUsed != 0) { + builder.setDnCacheUsed(dnCacheUsed); } - HeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); @@ -211,13 +213,13 @@ public DatanodeCommand blockReport(DatanodeRegistration registration, @Override public DatanodeCommand cacheReport(DatanodeRegistration registration, - String poolId, long[] blocks) throws IOException { + String poolId, List blockIds) throws IOException { CacheReportRequestProto.Builder builder = CacheReportRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); - for (int i=0; i cacheList = request.getCacheReportsList(); - CacheReport[] cacheReport = new CacheReport[list.size()]; - i = 0; - for (CacheReportProto p : cacheList) { - cacheReport[i++] = new CacheReport(p.getCacheCapacity(), - p.getCacheUsed()); - } response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), - report, cacheReport, request.getXmitsInProgress(), + report, request.getDnCacheCapacity(), request.getDnCacheUsed(), + request.getXmitsInProgress(), request.getXceiverCount(), request.getFailedVolumes()); } catch (IOException e) { throw new ServiceException(e); @@ -179,7 +171,7 @@ public CacheReportResponseProto cacheReport(RpcController controller, cmd = impl.cacheReport( PBHelper.convert(request.getRegistration()), request.getBlockPoolId(), - Longs.toArray(request.getBlocksList())); + request.getBlocksList()); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 2758611793..4626dc0687 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto; @@ -125,7 +124,6 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; -import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -1436,17 +1434,11 @@ private static State convert(StorageState state) { } public static StorageReportProto convert(StorageReport r) { - return StorageReportProto.newBuilder() + StorageReportProto.Builder builder = StorageReportProto.newBuilder() .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity()) .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining()) - .setStorageID(r.getStorageID()).build(); - } - - public static CacheReportProto convert(CacheReport r) { - return CacheReportProto.newBuilder() - .setCacheCapacity(r.getCapacity()) - .setCacheUsed(r.getUsed()) - .build(); + .setStorageID(r.getStorageID()); + return builder.build(); } public static JournalInfo convert(JournalInfoProto info) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index b96292410e..73d3cdffad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -24,6 +24,7 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -38,7 +39,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -435,7 +435,7 @@ DatanodeCommand blockReport() throws IOException { DatanodeCommand cacheReport() throws IOException { // If caching is disabled, do not send a cache report - if (dn.getFSDataset().getCacheCapacity() == 0) { + if (dn.getFSDataset().getDnCacheCapacity() == 0) { return null; } // send cache report if timer has expired. @@ -448,16 +448,15 @@ DatanodeCommand cacheReport() throws IOException { lastCacheReport = startTime; String bpid = bpos.getBlockPoolId(); - BlockListAsLongs blocks = dn.getFSDataset().getCacheReport(bpid); + List blockIds = dn.getFSDataset().getCacheReport(bpid); long createTime = Time.monotonicNow(); - cmd = bpNamenode.cacheReport(bpRegistration, bpid, - blocks.getBlockListAsLongs()); + cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds); long sendTime = Time.monotonicNow(); long createCost = createTime - startTime; long sendCost = sendTime - createTime; dn.getMetrics().addCacheReport(sendCost); - LOG.info("CacheReport of " + blocks.getNumberOfBlocks() + LOG.info("CacheReport of " + blockIds.size() + " blocks took " + createCost + " msec to generate and " + sendCost + " msecs for RPC and NN processing"); } @@ -475,10 +474,9 @@ HeartbeatResponse sendHeartBeat() throws IOException { dn.getFSDataset().getDfsUsed(), dn.getFSDataset().getRemaining(), dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) }; - CacheReport[] cacheReport = { new CacheReport( - dn.getFSDataset().getCacheCapacity(), - dn.getFSDataset().getCacheUsed()) }; - return bpNamenode.sendHeartbeat(bpRegistration, report, cacheReport, + return bpNamenode.sendHeartbeat(bpRegistration, report, + dn.getFSDataset().getDnCacheCapacity(), + dn.getFSDataset().getDnCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), dn.getFSDataset().getNumFailedVolumes()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 07f0e72aad..4c7edd7ca4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -270,12 +270,12 @@ public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen public BlockListAsLongs getBlockReport(String bpid); /** - * Returns the cache report - the full list of cached blocks of a - * block pool - * @param bpid Block Pool Id - * @return - the cache report - the full list of cached blocks + * Returns the cache report - the full list of cached block IDs of a + * block pool. + * @param bpid Block Pool Id + * @return the cache report - the full list of cached block IDs. */ - public BlockListAsLongs getCacheReport(String bpid); + public List getCacheReport(String bpid); /** Does the dataset contain the block? */ public boolean contains(ExtendedBlock block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index dd05c18bfc..6a68e633e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -85,14 +85,14 @@ boolean isCached(String bpid, long blockId) { * @return List of cached blocks suitable for translation into a * {@link BlockListAsLongs} for a cache report. */ - List getCachedBlocks(String bpid) { - List blocks = new ArrayList(); + List getCachedBlocks(String bpid) { + List blocks = new ArrayList(); // ConcurrentHashMap iteration doesn't see latest updates, which is okay Iterator it = cachedBlocks.values().iterator(); while (it.hasNext()) { MappableBlock mapBlock = it.next(); if (mapBlock.getBlockPoolId().equals(bpid)) { - blocks.add(mapBlock.getBlock()); + blocks.add(mapBlock.getBlock().getBlockId()); } } return blocks; @@ -213,15 +213,11 @@ public void run() { // Stats related methods for FsDatasetMBean - public long getCacheUsed() { + public long getDnCacheUsed() { return usedBytes.get(); } - public long getCacheCapacity() { + public long getDnCacheCapacity() { return maxBytes; } - - public long getCacheRemaining() { - return maxBytes - usedBytes.get(); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index be664fd76b..010be39518 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -294,24 +294,16 @@ public int getNumFailedVolumes() { * Returns the total cache used by the datanode (in bytes). */ @Override // FSDatasetMBean - public long getCacheUsed() { - return cacheManager.getCacheUsed(); + public long getDnCacheUsed() { + return cacheManager.getDnCacheUsed(); } /** * Returns the total cache capacity of the datanode (in bytes). */ @Override // FSDatasetMBean - public long getCacheCapacity() { - return cacheManager.getCacheCapacity(); - } - - /** - * Returns the total amount of cache remaining (in bytes). - */ - @Override // FSDatasetMBean - public long getCacheRemaining() { - return cacheManager.getCacheRemaining(); + public long getDnCacheCapacity() { + return cacheManager.getDnCacheCapacity(); } /** @@ -1031,8 +1023,8 @@ public BlockListAsLongs getBlockReport(String bpid) { } @Override // FsDatasetSpi - public BlockListAsLongs getCacheReport(String bpid) { - return new BlockListAsLongs(cacheManager.getCachedBlocks(bpid), null); + public List getCacheReport(String bpid) { + return cacheManager.getCachedBlocks(bpid); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java index 44325ce9bb..82757d065b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java @@ -81,15 +81,10 @@ public interface FSDatasetMBean { /** * Returns the total cache used by the datanode (in bytes). */ - public long getCacheUsed(); + public long getDnCacheUsed(); /** * Returns the total cache capacity of the datanode (in bytes). */ - public long getCacheCapacity(); - - /** - * Returns the total amount of cache remaining (in bytes). - */ - public long getCacheRemaining(); + public long getDnCacheCapacity(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 8931078f3d..bda729cfc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -538,11 +538,11 @@ public void setCachedLocations(LocatedBlock block) { } public final void processCacheReport(final DatanodeID datanodeID, - final BlockListAsLongs report) throws IOException { + final List blockIds) throws IOException { if (!enabled) { LOG.info("Ignoring cache report from " + datanodeID + " because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " + - "number of blocks: " + report.getNumberOfBlocks()); + "number of blocks: " + blockIds.size()); return; } namesystem.writeLock(); @@ -555,7 +555,7 @@ public final void processCacheReport(final DatanodeID datanodeID, throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); } - processCacheReportImpl(datanode, report); + processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); @@ -567,22 +567,16 @@ public final void processCacheReport(final DatanodeID datanodeID, metrics.addCacheBlockReport((int) (endTime - startTime)); } LOG.info("Processed cache report from " - + datanodeID + ", blocks: " + report.getNumberOfBlocks() + + datanodeID + ", blocks: " + blockIds.size() + ", processing time: " + (endTime - startTime) + " msecs"); } private void processCacheReportImpl(final DatanodeDescriptor datanode, - final BlockListAsLongs report) { + final List blockIds) { CachedBlocksList cached = datanode.getCached(); cached.clear(); - BlockReportIterator itBR = report.getBlockReportIterator(); - while (itBR.hasNext()) { - Block block = itBR.next(); - ReplicaState iState = itBR.getCurrentReplicaState(); - if (iState != ReplicaState.FINALIZED) { - LOG.error("Cached block report contained unfinalized block " + block); - continue; - } + for (Iterator iter = blockIds.iterator(); iter.hasNext(); ) { + Block block = new Block(iter.next()); BlockInfo blockInfo = blockManager.getStoredBlock(block); if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) { // The NameNode will eventually remove or update the out-of-date block. @@ -591,6 +585,12 @@ private void processCacheReportImpl(final DatanodeDescriptor datanode, block + ": expected genstamp " + blockInfo.getGenerationStamp()); continue; } + if (!blockInfo.isComplete()) { + LOG.warn("Ignoring block id " + block.getBlockId() + ", because " + + "it is in not complete yet. It is in state " + + blockInfo.getBlockUCState()); + continue; + } Collection corruptReplicas = blockManager.getCorruptReplicas(blockInfo); if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index c2d526086b..162a915bf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -102,7 +102,6 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; -import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -962,13 +961,14 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) @Override // DatanodeProtocol public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, - StorageReport[] report, CacheReport[] cacheReport, int xmitsInProgress, - int xceiverCount, int failedVolumes) throws IOException { + StorageReport[] report, long dnCacheCapacity, long dnCacheUsed, + int xmitsInProgress, int xceiverCount, + int failedVolumes) throws IOException { verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(), report[0].getDfsUsed(), report[0].getRemaining(), - report[0].getBlockPoolUsed(), cacheReport[0].getCapacity(), - cacheReport[0].getUsed(), xceiverCount, xmitsInProgress, failedVolumes); + report[0].getBlockPoolUsed(), dnCacheCapacity, dnCacheUsed, + xceiverCount, xmitsInProgress, failedVolumes); } @Override // DatanodeProtocol @@ -990,15 +990,13 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg, @Override public DatanodeCommand cacheReport(DatanodeRegistration nodeReg, - String poolId, long[] blocks) throws IOException { + String poolId, List blockIds) throws IOException { verifyRequest(nodeReg); - BlockListAsLongs blist = new BlockListAsLongs(blocks); if (blockStateChangeLog.isDebugEnabled()) { blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: " - + "from " + nodeReg + " " + blist.getNumberOfBlocks() - + " blocks"); + + "from " + nodeReg + " " + blockIds.size() + " blocks"); } - namesystem.getCacheManager().processCacheReport(nodeReg, blist); + namesystem.getCacheManager().processCacheReport(nodeReg, blockIds); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/CacheReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/CacheReport.java deleted file mode 100644 index 14e2f77a73..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/CacheReport.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.protocol; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Utilization report for a Datanode cache - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class CacheReport { - private final long capacity; - private final long used; - - public CacheReport(long capacity, long used) { - this.capacity = capacity; - this.used = used; - } - - public long getCapacity() { - return capacity; - } - - public long getUsed() { - return used; - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 9e74967dac..c990b37160 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.protocol; import java.io.*; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -106,7 +107,8 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration @Idempotent public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, StorageReport[] reports, - CacheReport[] cacheReports, + long dnCacheCapacity, + long dnCacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes) throws IOException; @@ -139,16 +141,15 @@ public DatanodeCommand blockReport(DatanodeRegistration registration, * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])}, * which is used to communicated blocks stored on disk. * - * @param registration - * @param poolId block pool ID for the blocks - * @param blocks a Long[] array from {@link BlockListAsLongs} that describes - * the list of cached blocks. This is more memory-efficient than a Block[]. - * @return + * @param The datanode registration. + * @param poolId The block pool ID for the blocks. + * @param blockIds A list of block IDs. + * @return The DatanodeCommand. * @throws IOException */ @Idempotent public DatanodeCommand cacheReport(DatanodeRegistration registration, - String poolId, long[] blocks) throws IOException; + String poolId, List blockIds) throws IOException; /** * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 76a47c6951..d64d97abca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -188,7 +188,8 @@ message HeartbeatRequestProto { optional uint32 xmitsInProgress = 3 [ default = 0 ]; optional uint32 xceiverCount = 4 [ default = 0 ]; optional uint32 failedVolumes = 5 [ default = 0 ]; - repeated CacheReportProto cacheReports = 6; + optional uint64 dnCacheCapacity = 6 [ default = 0 ]; + optional uint64 dnCacheUsed = 7 [default = 0 ]; } message StorageReportProto { @@ -200,11 +201,6 @@ message StorageReportProto { optional uint64 blockPoolUsed = 6 [ default = 0 ]; } -message CacheReportProto { - optional uint64 cacheCapacity = 1 [default = 0 ]; - optional uint64 cacheUsed = 2 [default = 0 ]; -} - /** * state - State the NN is in when returning response to the DN * txid - Highest transaction ID this NN has seen diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 217d1119ad..d5df755167 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; @@ -466,8 +467,8 @@ public synchronized BlockListAsLongs getBlockReport(String bpid) { } @Override // FsDatasetSpi - public BlockListAsLongs getCacheReport(String bpid) { - return new BlockListAsLongs(); + public List getCacheReport(String bpid) { + return new LinkedList(); } @Override // FSDatasetMBean @@ -496,17 +497,12 @@ public int getNumFailedVolumes() { } @Override // FSDatasetMBean - public long getCacheUsed() { + public long getDnCacheUsed() { return 0l; } @Override // FSDatasetMBean - public long getCacheCapacity() { - return 0l; - } - - @Override // FSDatasetMBean - public long getCacheRemaining() { + public long getDnCacheCapacity() { return 0l; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 3dec1fca56..420dfca2c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; -import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -127,7 +126,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) .when(mock).sendHeartbeat( Mockito.any(DatanodeRegistration.class), Mockito.any(StorageReport[].class), - Mockito.any(CacheReport[].class), + Mockito.anyLong(), + Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index d45f3fba50..bc78744f03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -67,7 +67,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -155,7 +154,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation) when(namenode.sendHeartbeat( Mockito.any(DatanodeRegistration.class), Mockito.any(StorageReport[].class), - Mockito.any(CacheReport[].class), + Mockito.anyLong(), + Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt())) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index ce22c951ff..86afd857f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doReturn; @@ -43,7 +44,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; -import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -109,8 +109,7 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds) fsImage.getLastAppliedOrWrittenTxId())); doReturn(response).when(spyNN).sendHeartbeat( (DatanodeRegistration) any(), - (StorageReport[]) any(), - (CacheReport[]) any(), + (StorageReport[]) any(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt()); } @@ -166,15 +165,11 @@ private static long[] getBlockSizes(HdfsBlockLocation[] locs) * Blocks until cache usage hits the expected new value. */ private long verifyExpectedCacheUsage(final long expected) throws Exception { - long cacheUsed = fsd.getCacheUsed(); + long cacheUsed = fsd.getDnCacheUsed(); while (cacheUsed != expected) { - cacheUsed = fsd.getCacheUsed(); + cacheUsed = fsd.getDnCacheUsed(); Thread.sleep(100); } - long cacheCapacity = fsd.getCacheCapacity(); - long cacheRemaining = fsd.getCacheRemaining(); - assertEquals("Sum of used and remaining cache does not equal total", - cacheCapacity, cacheUsed+cacheRemaining); assertEquals("Unexpected amount of cache used", expected, cacheUsed); return cacheUsed; } @@ -195,8 +190,8 @@ public void testCacheAndUncacheBlock() throws Exception { final long[] blockSizes = getBlockSizes(locs); // Check initial state - final long cacheCapacity = fsd.getCacheCapacity(); - long cacheUsed = fsd.getCacheUsed(); + final long cacheCapacity = fsd.getDnCacheCapacity(); + long cacheUsed = fsd.getDnCacheUsed(); long current = 0; assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Unexpected amount of cache used", current, cacheUsed); @@ -257,7 +252,7 @@ public void testFilesExceedMaxLockedMemory() throws Exception { // Uncache the cached part of the nth file setHeartbeatResponse(uncacheBlocks(fileLocs[numFiles-1])); - while (fsd.getCacheUsed() != oldCurrent) { + while (fsd.getDnCacheUsed() != oldCurrent) { Thread.sleep(100); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index e093e9bbab..55bf955f17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -845,9 +845,8 @@ void sendHeartbeat() throws IOException { // TODO:FEDERATION currently a single block pool is supported StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(), false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; - CacheReport[] cacheRep = { new CacheReport(0l, 0l) }; - DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, - rep, cacheRep, 0, 0, 0).getCommands(); + DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep, + 0L, 0L, 0, 0, 0).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -893,9 +892,8 @@ int replicateBlocks() throws IOException { // register datanode StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(), false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; - CacheReport[] cacheRep = { new CacheReport(0l, 0l) }; DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, - rep, cacheRep, 0, 0, 0).getCommands(); + rep, 0L, 0L, 0, 0, 0).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index ddcedcf44f..6eed13e1be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.protocol.CacheReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -143,8 +142,7 @@ public void testDeadDatanode() throws Exception { // that asks datanode to register again StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0, 0, 0) }; - CacheReport[] cacheRep = { new CacheReport(0l, 0l) }; - DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, cacheRep, 0, 0, 0) + DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0) .getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER