HDFS-5014. Process register commands with out holding BPOfferService lock. Contributed by Vinay.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543861 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f26d2adbf9
commit
04cf2a768c
@ -692,6 +692,9 @@ Release 2.2.1 - UNRELEASED
|
|||||||
HDFS-4516. Client crash after block allocation and NN switch before lease recovery for
|
HDFS-4516. Client crash after block allocation and NN switch before lease recovery for
|
||||||
the same file can cause readers to fail forever (VinaayKumar B via umamahesh)
|
the same file can cause readers to fail forever (VinaayKumar B via umamahesh)
|
||||||
|
|
||||||
|
HDFS-5014. Process register commands with out holding BPOfferService lock.
|
||||||
|
(Vinaykumar B via umamahesh)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -73,7 +73,7 @@ class BPOfferService {
|
|||||||
* This is assigned after the second phase of the
|
* This is assigned after the second phase of the
|
||||||
* handshake.
|
* handshake.
|
||||||
*/
|
*/
|
||||||
DatanodeRegistration bpRegistration;
|
volatile DatanodeRegistration bpRegistration;
|
||||||
|
|
||||||
private final DataNode dn;
|
private final DataNode dn;
|
||||||
|
|
||||||
@ -295,7 +295,7 @@ synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOExcep
|
|||||||
* NN, it calls this function to verify that the NN it connected to
|
* NN, it calls this function to verify that the NN it connected to
|
||||||
* is consistent with other NNs serving the block-pool.
|
* is consistent with other NNs serving the block-pool.
|
||||||
*/
|
*/
|
||||||
void registrationSucceeded(BPServiceActor bpServiceActor,
|
synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
|
||||||
DatanodeRegistration reg) throws IOException {
|
DatanodeRegistration reg) throws IOException {
|
||||||
if (bpRegistration != null) {
|
if (bpRegistration != null) {
|
||||||
checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
|
checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
|
||||||
@ -497,17 +497,37 @@ void triggerHeartbeatForTests() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized boolean processCommandFromActor(DatanodeCommand cmd,
|
boolean processCommandFromActor(DatanodeCommand cmd,
|
||||||
BPServiceActor actor) throws IOException {
|
BPServiceActor actor) throws IOException {
|
||||||
assert bpServices.contains(actor);
|
assert bpServices.contains(actor);
|
||||||
if (actor == bpServiceToActive) {
|
if (cmd == null) {
|
||||||
return processCommandFromActive(cmd, actor);
|
return true;
|
||||||
} else {
|
}
|
||||||
return processCommandFromStandby(cmd, actor);
|
/*
|
||||||
|
* Datanode Registration can be done asynchronously here. No need to hold
|
||||||
|
* the lock. for more info refer HDFS-5014
|
||||||
|
*/
|
||||||
|
if (DatanodeProtocol.DNA_REGISTER == cmd.getAction()) {
|
||||||
|
// namenode requested a registration - at start or if NN lost contact
|
||||||
|
// Just logging the claiming state is OK here instead of checking the
|
||||||
|
// actor state by obtaining the lock
|
||||||
|
LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr
|
||||||
|
+ " with " + actor.state + " state");
|
||||||
|
actor.reRegister();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
synchronized (this) {
|
||||||
|
if (actor == bpServiceToActive) {
|
||||||
|
return processCommandFromActive(cmd, actor);
|
||||||
|
} else {
|
||||||
|
return processCommandFromStandby(cmd, actor);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* This method should handle all commands from Active namenode except
|
||||||
|
* DNA_REGISTER which should be handled earlier itself.
|
||||||
*
|
*
|
||||||
* @param cmd
|
* @param cmd
|
||||||
* @return true if further processing may be required or false otherwise.
|
* @return true if further processing may be required or false otherwise.
|
||||||
@ -515,8 +535,6 @@ synchronized boolean processCommandFromActor(DatanodeCommand cmd,
|
|||||||
*/
|
*/
|
||||||
private boolean processCommandFromActive(DatanodeCommand cmd,
|
private boolean processCommandFromActive(DatanodeCommand cmd,
|
||||||
BPServiceActor actor) throws IOException {
|
BPServiceActor actor) throws IOException {
|
||||||
if (cmd == null)
|
|
||||||
return true;
|
|
||||||
final BlockCommand bcmd =
|
final BlockCommand bcmd =
|
||||||
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
|
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
|
||||||
final BlockIdCommand blockIdCmd =
|
final BlockIdCommand blockIdCmd =
|
||||||
@ -560,11 +578,6 @@ private boolean processCommandFromActive(DatanodeCommand cmd,
|
|||||||
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
|
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
|
||||||
// See HDFS-2987.
|
// See HDFS-2987.
|
||||||
throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
|
throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
|
||||||
case DatanodeProtocol.DNA_REGISTER:
|
|
||||||
// namenode requested a registration - at start or if NN lost contact
|
|
||||||
LOG.info("DatanodeCommand action: DNA_REGISTER");
|
|
||||||
actor.reRegister();
|
|
||||||
break;
|
|
||||||
case DatanodeProtocol.DNA_FINALIZE:
|
case DatanodeProtocol.DNA_FINALIZE:
|
||||||
String bp = ((FinalizeCommand) cmd).getBlockPoolId();
|
String bp = ((FinalizeCommand) cmd).getBlockPoolId();
|
||||||
assert getBlockPoolId().equals(bp) :
|
assert getBlockPoolId().equals(bp) :
|
||||||
@ -604,16 +617,13 @@ assert getBlockPoolId().equals(bp) :
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method should handle commands from Standby namenode except
|
||||||
|
* DNA_REGISTER which should be handled earlier itself.
|
||||||
|
*/
|
||||||
private boolean processCommandFromStandby(DatanodeCommand cmd,
|
private boolean processCommandFromStandby(DatanodeCommand cmd,
|
||||||
BPServiceActor actor) throws IOException {
|
BPServiceActor actor) throws IOException {
|
||||||
if (cmd == null)
|
|
||||||
return true;
|
|
||||||
switch(cmd.getAction()) {
|
switch(cmd.getAction()) {
|
||||||
case DatanodeProtocol.DNA_REGISTER:
|
|
||||||
// namenode requested a registration - at start or if NN lost contact
|
|
||||||
LOG.info("DatanodeCommand action from standby: DNA_REGISTER");
|
|
||||||
actor.reRegister();
|
|
||||||
break;
|
|
||||||
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
||||||
LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE");
|
LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE");
|
||||||
if (dn.isBlockTokenEnabled) {
|
if (dn.isBlockTokenEnabled) {
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
@ -73,6 +74,7 @@ class BPServiceActor implements Runnable {
|
|||||||
|
|
||||||
static final Log LOG = DataNode.LOG;
|
static final Log LOG = DataNode.LOG;
|
||||||
final InetSocketAddress nnAddr;
|
final InetSocketAddress nnAddr;
|
||||||
|
HAServiceState state;
|
||||||
|
|
||||||
BPOfferService bpos;
|
BPOfferService bpos;
|
||||||
|
|
||||||
@ -569,6 +571,7 @@ private void offerService() throws Exception {
|
|||||||
// that we should actually process.
|
// that we should actually process.
|
||||||
bpos.updateActorStatesFromHeartbeat(
|
bpos.updateActorStatesFromHeartbeat(
|
||||||
this, resp.getNameNodeHaState());
|
this, resp.getNameNodeHaState());
|
||||||
|
state = resp.getNameNodeHaState().getState();
|
||||||
|
|
||||||
long startProcessCommands = now();
|
long startProcessCommands = now();
|
||||||
if (!processCommand(resp.getCommands()))
|
if (!processCommand(resp.getCommands()))
|
||||||
|
Loading…
Reference in New Issue
Block a user