From 846f97312c6db7b84b7401174acd0fc943baa093 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Mon, 30 Jan 2012 19:16:15 +0000 Subject: [PATCH] HDFS-2691. Fixes for pipeline recovery in an HA cluster: report RBW replicas immediately upon pipeline creation. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1237935 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-1623.txt | 2 + .../hadoop/hdfs/protocolPB/PBHelper.java | 34 ++- .../server/blockmanagement/BlockManager.java | 63 +++-- .../hdfs/server/datanode/BPOfferService.java | 25 +- .../hdfs/server/datanode/BPServiceActor.java | 8 +- .../hdfs/server/datanode/BlockReceiver.java | 3 + .../hadoop/hdfs/server/datanode/DataNode.java | 15 +- .../hdfs/server/namenode/FSNamesystem.java | 2 +- .../server/namenode/NameNodeRpcServer.java | 2 +- .../protocol/ReceivedDeletedBlockInfo.java | 56 ++++- .../ReceivedDeletedBlockInfoWritable.java | 31 ++- .../src/main/proto/DatanodeProtocol.proto | 14 +- .../apache/hadoop/hdfs/AppendTestUtil.java | 10 +- .../namenode/NNThroughputBenchmark.java | 6 +- .../server/namenode/TestDeadDatanode.java | 4 +- .../namenode/ha/TestPipelinesFailover.java | 237 ++++++++++++++++++ 16 files changed, 456 insertions(+), 56 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 2ae00c763f..dd4fa42ffd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -137,3 +137,5 @@ HDFS-2838. NPE in FSNamesystem when in safe mode. (Gregory Chanan via eli) HDFS-2805. Add a test for a federated cluster with HA NNs. (Brandon Li via jitendra) HDFS-2841. HAAdmin does not work if security is enabled. (atm) + +HDFS-2691. Fixes for pipeline recovery in an HA cluster: report RBW replicas immediately upon pipeline creation. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index af224f34cb..2b2d0000fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; @@ -813,6 +814,23 @@ public static ReceivedDeletedBlockInfoProto convert( ReceivedDeletedBlockInfoProto.Builder builder = ReceivedDeletedBlockInfoProto.newBuilder(); + ReceivedDeletedBlockInfoProto.BlockStatus status; + switch (receivedDeletedBlockInfo.getStatus()) { + case RECEIVING_BLOCK: + status = ReceivedDeletedBlockInfoProto.BlockStatus.RECEIVING; + break; + case RECEIVED_BLOCK: + status = ReceivedDeletedBlockInfoProto.BlockStatus.RECEIVED; + break; + case DELETED_BLOCK: + status = ReceivedDeletedBlockInfoProto.BlockStatus.DELETED; + break; + default: + throw new IllegalArgumentException("Bad status: " + + receivedDeletedBlockInfo.getStatus()); + } + builder.setStatus(status); + if (receivedDeletedBlockInfo.getDelHints() != null) { builder.setDeleteHint(receivedDeletedBlockInfo.getDelHints()); } @@ -844,7 +862,21 @@ public static UpgradeCommandProto convert(UpgradeCommand comm) { public static ReceivedDeletedBlockInfo convert( ReceivedDeletedBlockInfoProto proto) { - return new ReceivedDeletedBlockInfo(PBHelper.convert(proto.getBlock()), + ReceivedDeletedBlockInfo.BlockStatus status = null; + switch (proto.getStatus()) { + case RECEIVING: + status = BlockStatus.RECEIVING_BLOCK; + break; + case RECEIVED: + status = BlockStatus.RECEIVED_BLOCK; + break; + case DELETED: + status = BlockStatus.DELETED_BLOCK; + break; + } + return new ReceivedDeletedBlockInfo( + PBHelper.convert(proto.getBlock()), + status, proto.hasDeleteHint() ? proto.getDeleteHint() : null); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 59cbeab439..9f2dfba55e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2256,13 +2256,19 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint) // Modify the blocks->datanode map and node's map. // pendingReplications.remove(block); - + processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED, + delHintNode); + } + + private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block, + ReplicaState reportedState, DatanodeDescriptor delHintNode) + throws IOException { // blockReceived reports a finalized block Collection toAdd = new LinkedList(); Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); - processReportedBlock(node, block, ReplicaState.FINALIZED, + processReportedBlock(node, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it @@ -2286,47 +2292,66 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint) } } - /** The given node is reporting that it received/deleted certain blocks. */ - public void blockReceivedAndDeleted(final DatanodeID nodeID, + /** + * The given node is reporting incremental information about some blocks. + * This includes blocks that are starting to be received, completed being + * received, or deleted. + */ + public void processIncrementalBlockReport(final DatanodeID nodeID, final String poolId, - final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[] + final ReceivedDeletedBlockInfo blockInfos[] ) throws IOException { namesystem.writeLock(); int received = 0; int deleted = 0; + int receiving = 0; try { final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isAlive) { NameNode.stateChangeLog - .warn("BLOCK* blockReceivedDeleted" + .warn("BLOCK* processIncrementalBlockReport" + " is received from dead or unregistered node " + nodeID.getName()); throw new IOException( - "Got blockReceivedDeleted message from unregistered or dead node"); + "Got incremental block report from unregistered or dead node"); } - for (int i = 0; i < receivedAndDeletedBlocks.length; i++) { - if (receivedAndDeletedBlocks[i].isDeletedBlock()) { - removeStoredBlock( - receivedAndDeletedBlocks[i].getBlock(), node); + for (ReceivedDeletedBlockInfo rdbi : blockInfos) { + switch (rdbi.getStatus()) { + case DELETED_BLOCK: + removeStoredBlock(rdbi.getBlock(), node); deleted++; - } else { - addBlock(node, receivedAndDeletedBlocks[i].getBlock(), - receivedAndDeletedBlocks[i].getDelHints()); + break; + case RECEIVED_BLOCK: + addBlock(node, rdbi.getBlock(), rdbi.getDelHints()); received++; + break; + case RECEIVING_BLOCK: + receiving++; + processAndHandleReportedBlock(node, rdbi.getBlock(), + ReplicaState.RBW, null); + break; + default: + String msg = + "Unknown block status code reported by " + nodeID.getName() + + ": " + rdbi; + NameNode.stateChangeLog.warn(msg); + assert false : msg; // if assertions are enabled, throw. + break; } if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("BLOCK* block" - + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted" - : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock() + NameNode.stateChangeLog.debug("BLOCK* block " + + (rdbi.getStatus()) + ": " + rdbi.getBlock() + " is received from " + nodeID.getName()); } } } finally { namesystem.writeUnlock(); NameNode.stateChangeLog - .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from " - + nodeID.getName() + " received: " + received + ", " + .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from " + + nodeID.getName() + + " receiving: " + receiving + ", " + + " received: " + received + ", " + " deleted: " + deleted); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 34c123cee7..27df1f2de1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import com.google.common.annotations.VisibleForTesting; @@ -202,10 +203,13 @@ void reportBadBlocks(ExtendedBlock block) { void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { checkBlock(block); checkDelHint(delHint); - ReceivedDeletedBlockInfo bInfo = - new ReceivedDeletedBlockInfo(block.getLocalBlock(), delHint); + ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( + block.getLocalBlock(), + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, + delHint); + for (BPServiceActor actor : bpServices) { - actor.notifyNamenodeReceivedBlock(bInfo); + actor.notifyNamenodeBlockImmediately(bInfo); } } @@ -224,13 +228,24 @@ private void checkDelHint(String delHint) { void notifyNamenodeDeletedBlock(ExtendedBlock block) { checkBlock(block); - ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block - .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT); + ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( + block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null); for (BPServiceActor actor : bpServices) { actor.notifyNamenodeDeletedBlock(bInfo); } } + + void notifyNamenodeReceivingBlock(ExtendedBlock block) { + checkBlock(block); + ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( + block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null); + + for (BPServiceActor actor : bpServices) { + actor.notifyNamenodeBlockImmediately(bInfo); + } + } + //This must be called only by blockPoolManager void start() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index ead3e143a5..e591676336 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -267,7 +267,7 @@ private void reportReceivedDeletedBlocks() throws IOException { * till namenode is informed before responding with success to the * client? For now we don't. */ - void notifyNamenodeReceivedBlock(ReceivedDeletedBlockInfo bInfo) { + void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) { synchronized (receivedAndDeletedBlockList) { receivedAndDeletedBlockList.add(bInfo); pendingReceivedRequests++; @@ -341,6 +341,12 @@ DatanodeCommand blockReport() throws IOException { long startTime = now(); if (startTime - lastBlockReport > dnConf.blockReportInterval) { + // Flush any block information that precedes the block report. Otherwise + // we have a chance that we will miss the delHint information + // or we will report an RBW replica after the BlockReport already reports + // a FINALIZED one. + reportReceivedDeletedBlocks(); + // Create block report long brCreateStartTime = now(); BlockListAsLongs bReport = dn.getFSDataset().getBlockReport( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index c8aac296a7..09706cab85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -153,6 +153,7 @@ class BlockReceiver implements Closeable { switch (stage) { case PIPELINE_SETUP_CREATE: replicaInfo = datanode.data.createRbw(block); + datanode.notifyNamenodeReceivingBlock(block); break; case PIPELINE_SETUP_STREAMING_RECOVERY: replicaInfo = datanode.data.recoverRbw( @@ -166,6 +167,7 @@ class BlockReceiver implements Closeable { block.getLocalBlock()); } block.setGenerationStamp(newGs); + datanode.notifyNamenodeReceivingBlock(block); break; case PIPELINE_SETUP_APPEND_RECOVERY: replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd); @@ -174,6 +176,7 @@ class BlockReceiver implements Closeable { block.getLocalBlock()); } block.setGenerationStamp(newGs); + datanode.notifyNamenodeReceivingBlock(block); break; case TRANSFER_RBW: case TRANSFER_FINALIZED: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 3ac89aec6c..726010e653 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -522,7 +522,18 @@ protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) if(bpos != null) { bpos.notifyNamenodeReceivedBlock(block, delHint); } else { - LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" + LOG.error("Cannot find BPOfferService for reporting block received for bpid=" + + block.getBlockPoolId()); + } + } + + // calls specific to BP + protected void notifyNamenodeReceivingBlock(ExtendedBlock block) { + BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); + if(bpos != null) { + bpos.notifyNamenodeReceivingBlock(block); + } else { + LOG.error("Cannot find BPOfferService for reporting block receiving for bpid=" + block.getBlockPoolId()); } } @@ -533,7 +544,7 @@ protected void notifyNamenodeDeletedBlock(ExtendedBlock block) { if (bpos != null) { bpos.notifyNamenodeDeletedBlock(block); } else { - LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid=" + LOG.error("Cannot find BPOfferService for reporting block deleted for bpid=" + block.getBlockPoolId()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b3b3dbdaf3..aef137c365 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4933,7 +4933,7 @@ public void notifyGenStampUpdate(long gs) { + m.getNodeReg().getName() + " " + m.getReceivedAndDeletedBlocks().length + " blocks."); } - this.getBlockManager().blockReceivedAndDeleted(m.getNodeReg(), + this.getBlockManager().processIncrementalBlockReport(m.getNodeReg(), m.getPoolId(), m.getReceivedAndDeletedBlocks()); break; case BLOCK_REPORT: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index a0d7e14897..5920762ac8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -928,7 +928,7 @@ public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length +" blocks."); } - namesystem.getBlockManager().blockReceivedAndDeleted( + namesystem.getBlockManager().processIncrementalBlockReport( nodeReg, poolId, receivedAndDeletedBlocks); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java index 45014add97..bde5a5e2d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java @@ -25,22 +25,47 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** - * A data structure to store Block and delHints together, used to send - * received/deleted ACKs. + * A data structure to store the blocks in an incremental block report. */ public class ReceivedDeletedBlockInfo implements Writable { Block block; + BlockStatus status; String delHints; - public final static String TODELETE_HINT = "-"; + public static enum BlockStatus { + RECEIVING_BLOCK(1), + RECEIVED_BLOCK(2), + DELETED_BLOCK(3); + + private final int code; + BlockStatus(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static BlockStatus fromCode(int code) { + for (BlockStatus bs : BlockStatus.values()) { + if (bs.code == code) { + return bs; + } + } + return null; + } + } public ReceivedDeletedBlockInfo() { } - public ReceivedDeletedBlockInfo(Block blk, String delHints) { + public ReceivedDeletedBlockInfo( + Block blk, BlockStatus status, String delHints) { this.block = blk; + this.status = status; this.delHints = delHints; } @@ -60,13 +85,19 @@ public void setDelHints(String hints) { this.delHints = hints; } + public BlockStatus getStatus() { + return status; + } + public boolean equals(Object o) { if (!(o instanceof ReceivedDeletedBlockInfo)) { return false; } ReceivedDeletedBlockInfo other = (ReceivedDeletedBlockInfo) o; return this.block.equals(other.getBlock()) - && this.delHints.equals(other.delHints); + && this.status == other.status + && (this.delHints == other.delHints || + this.delHints != null && this.delHints.equals(other.delHints)); } public int hashCode() { @@ -79,23 +110,30 @@ public boolean blockEquals(Block b) { } public boolean isDeletedBlock() { - return delHints.equals(TODELETE_HINT); + return status == BlockStatus.DELETED_BLOCK; } @Override public void write(DataOutput out) throws IOException { this.block.write(out); - Text.writeString(out, this.delHints); + WritableUtils.writeVInt(out, this.status.code); + if (this.status == BlockStatus.DELETED_BLOCK) { + Text.writeString(out, this.delHints); + } } @Override public void readFields(DataInput in) throws IOException { this.block = new Block(); this.block.readFields(in); - this.delHints = Text.readString(in); + this.status = BlockStatus.fromCode(WritableUtils.readVInt(in)); + if (this.status == BlockStatus.DELETED_BLOCK) { + this.delHints = Text.readString(in); + } } public String toString() { - return block.toString() + ", delHint: " + delHints; + return block.toString() + ", status: " + status + + ", delHint: " + delHints; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java index 5d37890c7f..02bf84c72a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReceivedDeletedBlockInfoWritable.java @@ -24,8 +24,10 @@ import org.apache.hadoop.hdfs.protocolR23Compatible.BlockWritable; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * A data structure to store Block and delHints together, used to send @@ -33,33 +35,43 @@ */ public class ReceivedDeletedBlockInfoWritable implements Writable { BlockWritable block; + int statusCode; String delHints; - public final static String TODELETE_HINT = "-"; public ReceivedDeletedBlockInfoWritable() { } - public ReceivedDeletedBlockInfoWritable(BlockWritable blk, String delHints) { + public ReceivedDeletedBlockInfoWritable( + BlockWritable blk, int statusCode, String delHints) { this.block = blk; + this.statusCode = statusCode; this.delHints = delHints; } + @Override public void write(DataOutput out) throws IOException { this.block.write(out); - Text.writeString(out, this.delHints); + WritableUtils.writeVInt(out, this.statusCode); + if (this.statusCode == BlockStatus.DELETED_BLOCK.getCode()) { + Text.writeString(out, this.delHints); + } } @Override public void readFields(DataInput in) throws IOException { this.block = new BlockWritable(); this.block.readFields(in); - this.delHints = Text.readString(in); + this.statusCode = WritableUtils.readVInt(in); + if (this.statusCode == BlockStatus.DELETED_BLOCK.getCode()) { + this.delHints = Text.readString(in); + } } public String toString() { - return block.toString() + ", delHint: " + delHints; + return block.toString() + ", statusCode: " + statusCode + + ", delHint: " + delHints; } public static ReceivedDeletedBlockInfo[] convert( @@ -83,13 +95,16 @@ public static ReceivedDeletedBlockInfoWritable[] convert( } public ReceivedDeletedBlockInfo convert() { - return new ReceivedDeletedBlockInfo(block.convert(), delHints); + return new ReceivedDeletedBlockInfo(block.convert(), + BlockStatus.fromCode(statusCode), delHints); } public static ReceivedDeletedBlockInfoWritable convert( ReceivedDeletedBlockInfo b) { if (b == null) return null; - return new ReceivedDeletedBlockInfoWritable(BlockWritable.convert(b - .getBlock()), b.getDelHints()); + return new ReceivedDeletedBlockInfoWritable( + BlockWritable.convert(b.getBlock()), + b.getStatus().getCode(), + b.getDelHints()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 6426de95ba..124bb5514e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -213,12 +213,16 @@ message BlockReportResponseProto { /** * Data structure to send received or deleted block information * from datanode to namenode. - * - * deleteHint set to "-" indicates block deletion. - * other deleteHint indicates block addition. */ message ReceivedDeletedBlockInfoProto { + enum BlockStatus { + RECEIVING = 1; // block being created + RECEIVED = 2; // block creation complete + DELETED = 3; + } + required BlockProto block = 1; + required BlockStatus status = 3; optional string deleteHint = 2; } @@ -329,7 +333,9 @@ service DatanodeProtocolService { rpc blockReport(BlockReportRequestProto) returns(BlockReportResponseProto); /** - * Report from datanode about recently received or deleted block + * Incremental block report from the DN. This contains info about recently + * received and deleted blocks, as well as when blocks start being + * received. */ rpc blockReceivedAndDeleted(BlockReceivedAndDeletedRequestProto) returns(BlockReceivedAndDeletedResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java index 478c790941..f28648189d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java @@ -113,8 +113,14 @@ public static void check(FileSystem fs, Path p, long length) throws IOException int i = -1; try { final FileStatus status = fs.getFileStatus(p); - TestCase.assertEquals(length, status.getLen()); - InputStream in = fs.open(p); + FSDataInputStream in = fs.open(p); + if (in.getWrappedStream() instanceof DFSInputStream) { + long len = ((DFSInputStream)in.getWrappedStream()).getFileLength(); + TestCase.assertEquals(length, len); + } else { + TestCase.assertEquals(length, status.getLen()); + } + for(i++; i < length; i++) { TestCase.assertEquals((byte)i, (byte)in.read()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 7d15900756..ae7e80676e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -884,7 +884,8 @@ private int transferBlocks( Block blocks[], nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode .getNamesystem().getBlockPoolId(), new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( - blocks[i], DataNode.EMPTY_DEL_HINT) }); + blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, + null) }); } } return blocks.length; @@ -999,7 +1000,8 @@ private ExtendedBlock addBlocks(String fileName, String clientName) nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc .getBlock().getBlockPoolId(), new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc - .getBlock().getLocalBlock(), "") }); + .getBlock().getLocalBlock(), + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }); } } return prevBlock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 33a7129457..54df2c7765 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -107,7 +107,9 @@ public void testDeadDatanode() throws Exception { DatanodeProtocol dnp = cluster.getNameNodeRpc(); ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo( - new Block(0), "") }; + new Block(0), + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, + null) }; // Ensure blockReceived call from dead datanode is rejected with IOException try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java new file mode 100644 index 0000000000..ce7347cdf0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; + +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.AppendTestUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.TestDFSClientFailover; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Level; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test cases regarding pipeline recovery during NN failover. + */ +public class TestPipelinesFailover { + static { + ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LogFactory.getLog( + "org.apache.hadoop.io.retry.RetryInvocationHandler")).getLogger().setLevel(Level.ALL); + + ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); + } + + protected static final Log LOG = LogFactory.getLog( + TestPipelinesFailover.class); + private static final Path TEST_PATH = + new Path("/test-file"); + private static final int BLOCK_SIZE = 4096; + private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2; + + /** + * Tests continuing a write pipeline over a failover. + */ + @Test(timeout=30000) + public void testWriteOverFailover() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + // Don't check replication periodically. + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000); + + FSDataOutputStream stm = null; + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + Thread.sleep(500); + + LOG.info("Starting with NN 0 active"); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + stm = fs.create(TEST_PATH); + + // write a block and a half + AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF); + + // Make sure all of the blocks are written out before failover. + stm.hflush(); + + LOG.info("Failing over to NN 1"); + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + assertTrue(fs.exists(TEST_PATH)); + FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem(); + BlockManagerTestUtil.updateState(ns1.getBlockManager()); + assertEquals(0, ns1.getPendingReplicationBlocks()); + assertEquals(0, ns1.getCorruptReplicaBlocks()); + assertEquals(0, ns1.getMissingBlocksCount()); + + // write another block and a half + AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF); + + stm.close(); + stm = null; + + AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE * 3); + } finally { + IOUtils.closeStream(stm); + cluster.shutdown(); + } + } + + /** + * Tests continuing a write pipeline over a failover when a DN fails + * after the failover - ensures that updating the pipeline succeeds + * even when the pipeline was constructed on a different NN. + */ + @Test(timeout=30000) + public void testWriteOverFailoverWithDnFail() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + + FSDataOutputStream stm = null; + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(5) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + Thread.sleep(500); + + LOG.info("Starting with NN 0 active"); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + stm = fs.create(TEST_PATH); + + // write a block and a half + AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF); + + // Make sure all the blocks are written before failover + stm.hflush(); + + LOG.info("Failing over to NN 1"); + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + assertTrue(fs.exists(TEST_PATH)); + + cluster.stopDataNode(0); + + // write another block and a half + AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF); + stm.hflush(); // TODO: see above + + LOG.info("Failing back to NN 0"); + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + cluster.stopDataNode(1); + + AppendTestUtil.write(stm, BLOCK_AND_A_HALF*2, BLOCK_AND_A_HALF); + stm.hflush(); // TODO: see above + + + stm.close(); + stm = null; + + AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF * 3); + } finally { + IOUtils.closeStream(stm); + cluster.shutdown(); + } + } + + /** + * Tests lease recovery if a client crashes. This approximates the + * use case of HBase WALs being recovered after a NN failover. + */ + @Test(timeout=30000) + public void testLeaseRecoveryAfterFailover() throws Exception { + final Configuration conf = new Configuration(); + // Disable permissions so that another user can recover the lease. + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + + FSDataOutputStream stm = null; + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + Thread.sleep(500); + + LOG.info("Starting with NN 0 active"); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + stm = fs.create(TEST_PATH); + + // write a block and a half + AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF); + stm.hflush(); + + LOG.info("Failing over to NN 1"); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + assertTrue(fs.exists(TEST_PATH)); + + FileSystem fsOtherUser = UserGroupInformation.createUserForTesting( + "otheruser", new String[] { "othergroup"}) + .doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return HATestUtil.configureFailoverFs(cluster, conf); + } + }); + ((DistributedFileSystem)fsOtherUser).recoverLease(TEST_PATH); + + AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF); + } finally { + IOUtils.closeStream(stm); + cluster.shutdown(); + } + } + +}