HDFS-5051. Propagate cache status information from the DataNode to the NameNode (Andrew Wang via Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1513653 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-08-13 21:05:09 +00:00
parent 2a4031940c
commit 52ccc6c6d5
12 changed files with 160 additions and 7 deletions

View File

@ -9,6 +9,9 @@ HDFS-4949 (Unreleased)
IMPROVEMENTS
HDFS-5049. Add JNI mlock support. (Andrew Wang via Colin Patrick McCabe)
HDFS-5051. Propagate cache status information from the DataNode to the
NameNode (Andrew Wang via Colin Patrick McCabe)
OPTIMIZATIONS
BUG FIXES

View File

@ -354,6 +354,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 60 * 60 * 1000;
public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
public static final int DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 1000;
public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";

View File

@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -36,6 +37,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
@ -202,6 +205,29 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null;
}
@Override
public DatanodeCommand cacheReport(DatanodeRegistration registration,
String poolId, long[] blocks) throws IOException {
CacheReportRequestProto.Builder builder =
CacheReportRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
for (int i=0; i<blocks.length; i++) {
builder.addBlocks(blocks[i]);
}
CacheReportResponseProto resp;
try {
resp = rpcProxy.cacheReport(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
if (resp.hasCmd()) {
return PBHelper.convert(resp.getCmd());
}
return null;
}
@Override
public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
@ -55,6 +57,7 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import com.google.common.primitives.Longs;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -159,6 +162,27 @@ public BlockReportResponseProto blockReport(RpcController controller,
return builder.build();
}
@Override
public CacheReportResponseProto cacheReport(RpcController controller,
CacheReportRequestProto request) throws ServiceException {
DatanodeCommand cmd = null;
try {
cmd = impl.cacheReport(
PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(),
Longs.toArray(request.getBlocksList()));
} catch (IOException e) {
throw new ServiceException(e);
}
CacheReportResponseProto.Builder builder =
CacheReportResponseProto.newBuilder();
if (cmd != null) {
builder.setCmd(PBHelper.convert(cmd));
}
return builder.build();
}
@Override
public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
RpcController controller, BlockReceivedAndDeletedRequestProto request)

View File

@ -1664,6 +1664,15 @@ public void processReport(final DatanodeID nodeID, final String poolId,
+ ", processing time: " + (endTime - startTime) + " msecs");
}
/**
* The given datanode is reporting all of its cached blocks.
* Update the cache state of blocks in the block map.
*/
public void processCacheReport(final DatanodeID nodeID, final String poolId,
final BlockListAsLongs newReport) throws IOException {
// TODO: Implement me!
}
/**
* Rescan the list of blocks which were previously postponed.
*/

View File

@ -368,6 +368,12 @@ void scheduleBlockReport(long delay) {
}
}
void scheduleCacheReport(long delay) {
for (BPServiceActor actor: bpServices) {
actor.scheduleCacheReport(delay);
}
}
/**
* Ask each of the actors to report a bad block hosted on another DN.
*/

View File

@ -84,6 +84,8 @@ class BPServiceActor implements Runnable {
boolean resetBlockReportTime = true;
volatile long lastCacheReport = 0;
Thread bpThread;
DatanodeProtocolClientSideTranslatorPB bpNamenode;
private volatile long lastHeartbeat = 0;
@ -239,6 +241,17 @@ void scheduleBlockReport(long delay) {
resetBlockReportTime = true; // reset future BRs for randomness
}
void scheduleCacheReport(long delay) {
if (delay > 0) {
// Uniform random jitter by the delay
lastCacheReport = Time.monotonicNow()
- dnConf.cacheReportInterval
+ DFSUtil.getRandom().nextInt(((int)delay));
} else { // send at next heartbeat
lastCacheReport = lastCacheReport - dnConf.cacheReportInterval;
}
}
void reportBadBlocks(ExtendedBlock block) {
if (bpRegistration == null) {
return;
@ -430,6 +443,15 @@ DatanodeCommand blockReport() throws IOException {
return cmd;
}
DatanodeCommand cacheReport() throws IOException {
// send cache report if timer has expired.
DatanodeCommand cmd = null;
long startTime = Time.monotonicNow();
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
// TODO: Implement me!
}
return cmd;
}
HeartbeatResponse sendHeartBeat() throws IOException {
if (LOG.isDebugEnabled()) {
@ -496,11 +518,12 @@ private synchronized void cleanUp() {
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+ dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+ dnConf.blockReportInterval + "msec" + " Initial delay: "
+ dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ dnConf.heartBeatInterval);
LOG.info("For namenode " + nnAddr + " using"
+ " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+ " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
+ "; heartBeatInterval=" + dnConf.heartBeatInterval);
//
// Now loop for a long time....
@ -555,6 +578,9 @@ private void offerService() throws Exception {
DatanodeCommand cmd = blockReport();
processCommand(new DatanodeCommand[]{ cmd });
cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
// Now safe to start scanning the block pool.
// If it has already been started, this is a no-op.
if (dn.blockScanner != null) {

View File

@ -70,6 +70,7 @@ public class DNConf {
final long blockReportInterval;
final long deleteReportInterval;
final long initialBlockReportDelay;
final long cacheReportInterval;
final int writePacketSize;
final String minimumNameNodeVersion;
@ -114,6 +115,8 @@ public DNConf(Configuration conf) {
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
this.cacheReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
long initBRDelay = conf.getLong(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,

View File

@ -1916,6 +1916,7 @@ static StartupOption getStartupOption(Configuration conf) {
public void scheduleAllBlockReport(long delay) {
for(BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
bpos.scheduleBlockReport(delay);
bpos.scheduleCacheReport(delay);
}
}

View File

@ -951,6 +951,18 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
return null;
}
@Override
public DatanodeCommand cacheReport(DatanodeRegistration nodeReg,
String poolId, long[] blocks) throws IOException {
verifyRequest(nodeReg);
BlockListAsLongs blist = new BlockListAsLongs(blocks);
namesystem.getBlockManager().processCacheReport(nodeReg, poolId, blist);
if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState()) {
return new FinalizeCommand(poolId);
}
return null;
}
@Override // DatanodeProtocol
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {

View File

@ -22,10 +22,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.KerberosInfo;
@ -128,6 +128,25 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException;
/**
* Communicates the complete list of locally cached blocks to the NameNode.
*
* This method is similar to
* {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
* which is used to communicated blocks stored on disk.
*
* @param registration
* @param poolId block pool ID for the blocks
* @param blocks a Long[] array from {@link BlockListAsLongs} that describes
* the list of cached blocks. This is more memory-efficient than a Block[].
* @return
* @throws IOException
*/
@Idempotent
public DatanodeCommand cacheReport(DatanodeRegistration registration,
String poolId, long[] blocks) throws IOException;
/**
* blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
* recently-received and -deleted block data.

View File

@ -205,9 +205,11 @@ message HeartbeatResponseProto {
/**
* registration - datanode registration information
* blockPoolID - block pool ID of the reported blocks
* blocks - each block is represented as two longs in the array.
* blocks - each block is represented as multiple longs in the array.
* first long represents block ID
* second long represents length
* third long represents gen stamp
* fourth long (if under construction) represents replica state
*/
message BlockReportRequestProto {
required DatanodeRegistrationProto registration = 1;
@ -230,6 +232,21 @@ message BlockReportResponseProto {
optional DatanodeCommandProto cmd = 1;
}
/**
* registration - datanode registration information
* blockPoolId - block pool ID of the reported blocks
* blocks - representation of blocks as longs for efficiency reasons
*/
message CacheReportRequestProto {
required DatanodeRegistrationProto registration = 1;
required string blockPoolId = 2;
repeated uint64 blocks = 3 [packed=true];
}
message CacheReportResponseProto {
optional DatanodeCommandProto cmd = 1;
}
/**
* Data structure to send received or deleted block information
* from datanode to namenode.
@ -347,6 +364,11 @@ service DatanodeProtocolService {
*/
rpc blockReport(BlockReportRequestProto) returns(BlockReportResponseProto);
/**
* Report cached blocks at a datanode to the namenode
*/
rpc cacheReport(CacheReportRequestProto) returns(CacheReportResponseProto);
/**
* Incremental block report from the DN. This contains info about recently
* received and deleted blocks, as well as when blocks start being