HDFS-2691. Fixes for pipeline recovery in an HA cluster: report RBW replicas immediately upon pipeline creation. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1237935 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-01-30 19:16:15 +00:00
parent 6122357da5
commit 846f97312c
16 changed files with 456 additions and 56 deletions

View File

@ -137,3 +137,5 @@ HDFS-2838. NPE in FSNamesystem when in safe mode. (Gregory Chanan via eli)
HDFS-2805. Add a test for a federated cluster with HA NNs. (Brandon Li via jitendra)
HDFS-2841. HAAdmin does not work if security is enabled. (atm)
HDFS-2691. Fixes for pipeline recovery in an HA cluster: report RBW replicas immediately upon pipeline creation. (todd)

View File

@ -116,6 +116,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -813,6 +814,23 @@ public static ReceivedDeletedBlockInfoProto convert(
ReceivedDeletedBlockInfoProto.Builder builder =
ReceivedDeletedBlockInfoProto.newBuilder();
ReceivedDeletedBlockInfoProto.BlockStatus status;
switch (receivedDeletedBlockInfo.getStatus()) {
case RECEIVING_BLOCK:
status = ReceivedDeletedBlockInfoProto.BlockStatus.RECEIVING;
break;
case RECEIVED_BLOCK:
status = ReceivedDeletedBlockInfoProto.BlockStatus.RECEIVED;
break;
case DELETED_BLOCK:
status = ReceivedDeletedBlockInfoProto.BlockStatus.DELETED;
break;
default:
throw new IllegalArgumentException("Bad status: " +
receivedDeletedBlockInfo.getStatus());
}
builder.setStatus(status);
if (receivedDeletedBlockInfo.getDelHints() != null) {
builder.setDeleteHint(receivedDeletedBlockInfo.getDelHints());
}
@ -844,7 +862,21 @@ public static UpgradeCommandProto convert(UpgradeCommand comm) {
public static ReceivedDeletedBlockInfo convert(
ReceivedDeletedBlockInfoProto proto) {
return new ReceivedDeletedBlockInfo(PBHelper.convert(proto.getBlock()),
ReceivedDeletedBlockInfo.BlockStatus status = null;
switch (proto.getStatus()) {
case RECEIVING:
status = BlockStatus.RECEIVING_BLOCK;
break;
case RECEIVED:
status = BlockStatus.RECEIVED_BLOCK;
break;
case DELETED:
status = BlockStatus.DELETED_BLOCK;
break;
}
return new ReceivedDeletedBlockInfo(
PBHelper.convert(proto.getBlock()),
status,
proto.hasDeleteHint() ? proto.getDeleteHint() : null);
}

View File

@ -2256,13 +2256,19 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint)
// Modify the blocks->datanode map and node's map.
//
pendingReplications.remove(block);
processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
delHintNode);
}
private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
processReportedBlock(node, block, ReplicaState.FINALIZED,
processReportedBlock(node, block, reportedState,
toAdd, toInvalidate, toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
@ -2286,47 +2292,66 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint)
}
}
/** The given node is reporting that it received/deleted certain blocks. */
public void blockReceivedAndDeleted(final DatanodeID nodeID,
/**
* The given node is reporting incremental information about some blocks.
* This includes blocks that are starting to be received, completed being
* received, or deleted.
*/
public void processIncrementalBlockReport(final DatanodeID nodeID,
final String poolId,
final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
final ReceivedDeletedBlockInfo blockInfos[]
) throws IOException {
namesystem.writeLock();
int received = 0;
int deleted = 0;
int receiving = 0;
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
NameNode.stateChangeLog
.warn("BLOCK* blockReceivedDeleted"
.warn("BLOCK* processIncrementalBlockReport"
+ " is received from dead or unregistered node "
+ nodeID.getName());
throw new IOException(
"Got blockReceivedDeleted message from unregistered or dead node");
"Got incremental block report from unregistered or dead node");
}
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
removeStoredBlock(
receivedAndDeletedBlocks[i].getBlock(), node);
for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
removeStoredBlock(rdbi.getBlock(), node);
deleted++;
} else {
addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
receivedAndDeletedBlocks[i].getDelHints());
break;
case RECEIVED_BLOCK:
addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
received++;
break;
case RECEIVING_BLOCK:
receiving++;
processAndHandleReportedBlock(node, rdbi.getBlock(),
ReplicaState.RBW, null);
break;
default:
String msg =
"Unknown block status code reported by " + nodeID.getName() +
": " + rdbi;
NameNode.stateChangeLog.warn(msg);
assert false : msg; // if assertions are enabled, throw.
break;
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* block"
+ (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
: "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
NameNode.stateChangeLog.debug("BLOCK* block "
+ (rdbi.getStatus()) + ": " + rdbi.getBlock()
+ " is received from " + nodeID.getName());
}
}
} finally {
namesystem.writeUnlock();
NameNode.stateChangeLog
.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
+ nodeID.getName() + " received: " + received + ", "
.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+ nodeID.getName()
+ " receiving: " + receiving + ", "
+ " received: " + received + ", "
+ " deleted: " + deleted);
}
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import com.google.common.annotations.VisibleForTesting;
@ -202,10 +203,13 @@ void reportBadBlocks(ExtendedBlock block) {
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
checkBlock(block);
checkDelHint(delHint);
ReceivedDeletedBlockInfo bInfo =
new ReceivedDeletedBlockInfo(block.getLocalBlock(), delHint);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
delHint);
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeReceivedBlock(bInfo);
actor.notifyNamenodeBlockImmediately(bInfo);
}
}
@ -224,13 +228,24 @@ private void checkDelHint(String delHint) {
void notifyNamenodeDeletedBlock(ExtendedBlock block) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block
.getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeDeletedBlock(bInfo);
}
}
void notifyNamenodeReceivingBlock(ExtendedBlock block) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeBlockImmediately(bInfo);
}
}
//This must be called only by blockPoolManager
void start() {

View File

@ -267,7 +267,7 @@ private void reportReceivedDeletedBlocks() throws IOException {
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
void notifyNamenodeReceivedBlock(ReceivedDeletedBlockInfo bInfo) {
void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
synchronized (receivedAndDeletedBlockList) {
receivedAndDeletedBlockList.add(bInfo);
pendingReceivedRequests++;
@ -341,6 +341,12 @@ DatanodeCommand blockReport() throws IOException {
long startTime = now();
if (startTime - lastBlockReport > dnConf.blockReportInterval) {
// Flush any block information that precedes the block report. Otherwise
// we have a chance that we will miss the delHint information
// or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one.
reportReceivedDeletedBlocks();
// Create block report
long brCreateStartTime = now();
BlockListAsLongs bReport = dn.getFSDataset().getBlockReport(

View File

@ -153,6 +153,7 @@ class BlockReceiver implements Closeable {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(block);
datanode.notifyNamenodeReceivingBlock(block);
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
@ -166,6 +167,7 @@ class BlockReceiver implements Closeable {
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(block);
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@ -174,6 +176,7 @@ class BlockReceiver implements Closeable {
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(block);
break;
case TRANSFER_RBW:
case TRANSFER_FINALIZED:

View File

@ -522,7 +522,18 @@ protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint)
if(bpos != null) {
bpos.notifyNamenodeReceivedBlock(block, delHint);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
LOG.error("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
}
// calls specific to BP
protected void notifyNamenodeReceivingBlock(ExtendedBlock block) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
bpos.notifyNamenodeReceivingBlock(block);
} else {
LOG.error("Cannot find BPOfferService for reporting block receiving for bpid="
+ block.getBlockPoolId());
}
}
@ -533,7 +544,7 @@ protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
if (bpos != null) {
bpos.notifyNamenodeDeletedBlock(block);
} else {
LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
LOG.error("Cannot find BPOfferService for reporting block deleted for bpid="
+ block.getBlockPoolId());
}
}

View File

@ -4933,7 +4933,7 @@ public void notifyGenStampUpdate(long gs) {
+ m.getNodeReg().getName() + " "
+ m.getReceivedAndDeletedBlocks().length + " blocks.");
}
this.getBlockManager().blockReceivedAndDeleted(m.getNodeReg(),
this.getBlockManager().processIncrementalBlockReport(m.getNodeReg(),
m.getPoolId(), m.getReceivedAndDeletedBlocks());
break;
case BLOCK_REPORT:

View File

@ -928,7 +928,7 @@ public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
+"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
namesystem.getBlockManager().blockReceivedAndDeleted(
namesystem.getBlockManager().processIncrementalBlockReport(
nodeReg, poolId, receivedAndDeletedBlocks);
}

View File

@ -25,22 +25,47 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* A data structure to store Block and delHints together, used to send
* received/deleted ACKs.
* A data structure to store the blocks in an incremental block report.
*/
public class ReceivedDeletedBlockInfo implements Writable {
Block block;
BlockStatus status;
String delHints;
public final static String TODELETE_HINT = "-";
public static enum BlockStatus {
RECEIVING_BLOCK(1),
RECEIVED_BLOCK(2),
DELETED_BLOCK(3);
private final int code;
BlockStatus(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public static BlockStatus fromCode(int code) {
for (BlockStatus bs : BlockStatus.values()) {
if (bs.code == code) {
return bs;
}
}
return null;
}
}
public ReceivedDeletedBlockInfo() {
}
public ReceivedDeletedBlockInfo(Block blk, String delHints) {
public ReceivedDeletedBlockInfo(
Block blk, BlockStatus status, String delHints) {
this.block = blk;
this.status = status;
this.delHints = delHints;
}
@ -60,13 +85,19 @@ public void setDelHints(String hints) {
this.delHints = hints;
}
public BlockStatus getStatus() {
return status;
}
public boolean equals(Object o) {
if (!(o instanceof ReceivedDeletedBlockInfo)) {
return false;
}
ReceivedDeletedBlockInfo other = (ReceivedDeletedBlockInfo) o;
return this.block.equals(other.getBlock())
&& this.delHints.equals(other.delHints);
&& this.status == other.status
&& (this.delHints == other.delHints ||
this.delHints != null && this.delHints.equals(other.delHints));
}
public int hashCode() {
@ -79,23 +110,30 @@ public boolean blockEquals(Block b) {
}
public boolean isDeletedBlock() {
return delHints.equals(TODELETE_HINT);
return status == BlockStatus.DELETED_BLOCK;
}
@Override
public void write(DataOutput out) throws IOException {
this.block.write(out);
Text.writeString(out, this.delHints);
WritableUtils.writeVInt(out, this.status.code);
if (this.status == BlockStatus.DELETED_BLOCK) {
Text.writeString(out, this.delHints);
}
}
@Override
public void readFields(DataInput in) throws IOException {
this.block = new Block();
this.block.readFields(in);
this.delHints = Text.readString(in);
this.status = BlockStatus.fromCode(WritableUtils.readVInt(in));
if (this.status == BlockStatus.DELETED_BLOCK) {
this.delHints = Text.readString(in);
}
}
public String toString() {
return block.toString() + ", delHint: " + delHints;
return block.toString() + ", status: " + status +
", delHint: " + delHints;
}
}

View File

@ -24,8 +24,10 @@
import org.apache.hadoop.hdfs.protocolR23Compatible.BlockWritable;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* A data structure to store Block and delHints together, used to send
@ -33,33 +35,43 @@
*/
public class ReceivedDeletedBlockInfoWritable implements Writable {
BlockWritable block;
int statusCode;
String delHints;
public final static String TODELETE_HINT = "-";
public ReceivedDeletedBlockInfoWritable() {
}
public ReceivedDeletedBlockInfoWritable(BlockWritable blk, String delHints) {
public ReceivedDeletedBlockInfoWritable(
BlockWritable blk, int statusCode, String delHints) {
this.block = blk;
this.statusCode = statusCode;
this.delHints = delHints;
}
@Override
public void write(DataOutput out) throws IOException {
this.block.write(out);
Text.writeString(out, this.delHints);
WritableUtils.writeVInt(out, this.statusCode);
if (this.statusCode == BlockStatus.DELETED_BLOCK.getCode()) {
Text.writeString(out, this.delHints);
}
}
@Override
public void readFields(DataInput in) throws IOException {
this.block = new BlockWritable();
this.block.readFields(in);
this.delHints = Text.readString(in);
this.statusCode = WritableUtils.readVInt(in);
if (this.statusCode == BlockStatus.DELETED_BLOCK.getCode()) {
this.delHints = Text.readString(in);
}
}
public String toString() {
return block.toString() + ", delHint: " + delHints;
return block.toString() + ", statusCode: " + statusCode +
", delHint: " + delHints;
}
public static ReceivedDeletedBlockInfo[] convert(
@ -83,13 +95,16 @@ public static ReceivedDeletedBlockInfoWritable[] convert(
}
public ReceivedDeletedBlockInfo convert() {
return new ReceivedDeletedBlockInfo(block.convert(), delHints);
return new ReceivedDeletedBlockInfo(block.convert(),
BlockStatus.fromCode(statusCode), delHints);
}
public static ReceivedDeletedBlockInfoWritable convert(
ReceivedDeletedBlockInfo b) {
if (b == null) return null;
return new ReceivedDeletedBlockInfoWritable(BlockWritable.convert(b
.getBlock()), b.getDelHints());
return new ReceivedDeletedBlockInfoWritable(
BlockWritable.convert(b.getBlock()),
b.getStatus().getCode(),
b.getDelHints());
}
}

View File

@ -213,12 +213,16 @@ message BlockReportResponseProto {
/**
* Data structure to send received or deleted block information
* from datanode to namenode.
*
* deleteHint set to "-" indicates block deletion.
* other deleteHint indicates block addition.
*/
message ReceivedDeletedBlockInfoProto {
enum BlockStatus {
RECEIVING = 1; // block being created
RECEIVED = 2; // block creation complete
DELETED = 3;
}
required BlockProto block = 1;
required BlockStatus status = 3;
optional string deleteHint = 2;
}
@ -329,7 +333,9 @@ service DatanodeProtocolService {
rpc blockReport(BlockReportRequestProto) returns(BlockReportResponseProto);
/**
* Report from datanode about recently received or deleted block
* Incremental block report from the DN. This contains info about recently
* received and deleted blocks, as well as when blocks start being
* received.
*/
rpc blockReceivedAndDeleted(BlockReceivedAndDeletedRequestProto)
returns(BlockReceivedAndDeletedResponseProto);

View File

@ -113,8 +113,14 @@ public static void check(FileSystem fs, Path p, long length) throws IOException
int i = -1;
try {
final FileStatus status = fs.getFileStatus(p);
TestCase.assertEquals(length, status.getLen());
InputStream in = fs.open(p);
FSDataInputStream in = fs.open(p);
if (in.getWrappedStream() instanceof DFSInputStream) {
long len = ((DFSInputStream)in.getWrappedStream()).getFileLength();
TestCase.assertEquals(length, len);
} else {
TestCase.assertEquals(length, status.getLen());
}
for(i++; i < length; i++) {
TestCase.assertEquals((byte)i, (byte)in.read());
}

View File

@ -884,7 +884,8 @@ private int transferBlocks( Block blocks[],
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
.getNamesystem().getBlockPoolId(),
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
blocks[i], DataNode.EMPTY_DEL_HINT) });
blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
null) });
}
}
return blocks.length;
@ -999,7 +1000,8 @@ private ExtendedBlock addBlocks(String fileName, String clientName)
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
.getBlock().getBlockPoolId(),
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc
.getBlock().getLocalBlock(), "") });
.getBlock().getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) });
}
}
return prevBlock;

View File

@ -107,7 +107,9 @@ public void testDeadDatanode() throws Exception {
DatanodeProtocol dnp = cluster.getNameNodeRpc();
ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
new Block(0), "") };
new Block(0),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
null) };
// Ensure blockReceived call from dead datanode is rejected with IOException
try {

View File

@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.TestDFSClientFailover;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.junit.Ignore;
import org.junit.Test;
/**
* Test cases regarding pipeline recovery during NN failover.
*/
public class TestPipelinesFailover {
static {
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(
"org.apache.hadoop.io.retry.RetryInvocationHandler")).getLogger().setLevel(Level.ALL);
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
}
protected static final Log LOG = LogFactory.getLog(
TestPipelinesFailover.class);
private static final Path TEST_PATH =
new Path("/test-file");
private static final int BLOCK_SIZE = 4096;
private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2;
/**
* Tests continuing a write pipeline over a failover.
*/
@Test(timeout=30000)
public void testWriteOverFailover() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
// Don't check replication periodically.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
FSDataOutputStream stm = null;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
stm = fs.create(TEST_PATH);
// write a block and a half
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
// Make sure all of the blocks are written out before failover.
stm.hflush();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
assertTrue(fs.exists(TEST_PATH));
FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
BlockManagerTestUtil.updateState(ns1.getBlockManager());
assertEquals(0, ns1.getPendingReplicationBlocks());
assertEquals(0, ns1.getCorruptReplicaBlocks());
assertEquals(0, ns1.getMissingBlocksCount());
// write another block and a half
AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
stm.close();
stm = null;
AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE * 3);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
/**
* Tests continuing a write pipeline over a failover when a DN fails
* after the failover - ensures that updating the pipeline succeeds
* even when the pipeline was constructed on a different NN.
*/
@Test(timeout=30000)
public void testWriteOverFailoverWithDnFail() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(5)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
stm = fs.create(TEST_PATH);
// write a block and a half
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
// Make sure all the blocks are written before failover
stm.hflush();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
assertTrue(fs.exists(TEST_PATH));
cluster.stopDataNode(0);
// write another block and a half
AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
stm.hflush(); // TODO: see above
LOG.info("Failing back to NN 0");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.stopDataNode(1);
AppendTestUtil.write(stm, BLOCK_AND_A_HALF*2, BLOCK_AND_A_HALF);
stm.hflush(); // TODO: see above
stm.close();
stm = null;
AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF * 3);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
/**
* Tests lease recovery if a client crashes. This approximates the
* use case of HBase WALs being recovered after a NN failover.
*/
@Test(timeout=30000)
public void testLeaseRecoveryAfterFailover() throws Exception {
final Configuration conf = new Configuration();
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
stm = fs.create(TEST_PATH);
// write a block and a half
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
stm.hflush();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
assertTrue(fs.exists(TEST_PATH));
FileSystem fsOtherUser = UserGroupInformation.createUserForTesting(
"otheruser", new String[] { "othergroup"})
.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return HATestUtil.configureFailoverFs(cluster, conf);
}
});
((DistributedFileSystem)fsOtherUser).recoverLease(TEST_PATH);
AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
}