HDFS-2301. Start/stop appropriate namenode services when transition to active and standby states. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1182080 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-10-11 20:44:34 +00:00
parent 8b4f497af8
commit f00198b16c
8 changed files with 274 additions and 122 deletions

View File

@ -11,3 +11,5 @@ HDFS-1974. Introduce active and standy states to the namenode. (suresh)
HDFS-2407. getServerDefaults and getStats don't check operation category (atm)
HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm)
HDFS-2301. Start/stop appropriate namenode services when transition to active and standby states. (suresh)

View File

@ -81,13 +81,13 @@ public class BackupNode extends NameNode {
// Common NameNode methods implementation for backup node.
/////////////////////////////////////////////////////
@Override // NameNode
protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
protected InetSocketAddress getRpcServerAddress(Configuration conf) {
String addr = conf.get(BN_ADDRESS_NAME_KEY, BN_ADDRESS_DEFAULT);
return NetUtils.createSocketAddr(addr);
}
@Override
protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) throws IOException {
protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) {
String addr = conf.get(BN_SERVICE_RPC_ADDRESS_KEY);
if (addr == null || addr.isEmpty()) {
return null;
@ -135,11 +135,6 @@ protected void initialize(Configuration conf) throws IOException {
CommonConfigurationKeys.FS_TRASH_INTERVAL_DEFAULT);
NamespaceInfo nsInfo = handshake(conf);
super.initialize(conf);
// Backup node should never do lease recovery,
// therefore lease hard limit should never expire.
namesystem.leaseManager.setLeasePeriod(
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE);
clusterId = nsInfo.getClusterID();
blockPoolId = nsInfo.getBlockPoolID();

View File

@ -130,7 +130,6 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
@ -347,28 +346,30 @@ void loadFSImage(StartupOption startOpt, FSImage fsImage)
dir.imageLoadComplete();
}
void activateSecretManager() throws IOException {
void startSecretManager() throws IOException {
if (dtSecretManager != null) {
dtSecretManager.startThreads();
}
}
/**
* Activate FSNamesystem daemons.
void stopSecretManager() {
if (dtSecretManager != null) {
dtSecretManager.stopThreads();
}
}
/**
* Start services common to both active and standby states
* @throws IOException
*/
void activate(Configuration conf) throws IOException {
void startCommonServices(Configuration conf) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
setBlockTotal();
blockManager.activate(conf);
this.lmthread = new Daemon(leaseManager.new Monitor());
lmthread.start();
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start();
} finally {
@ -378,7 +379,70 @@ void activate(Configuration conf) throws IOException {
registerMXBean();
DefaultMetricsSystem.instance().register(this);
}
/**
* Stop services common to both active and standby states
* @throws IOException
*/
void stopCommonServices() {
writeLock();
try {
if (blockManager != null) blockManager.close();
if (nnrmthread != null) nnrmthread.interrupt();
} finally {
writeUnlock();
}
}
/**
* Start services required in active state
* @throws IOException
*/
void startActiveServices() throws IOException {
LOG.info("Starting services required for active state");
writeLock();
try {
startSecretManager();
lmthread = new Daemon(leaseManager.new Monitor());
lmthread.start();
} finally {
writeUnlock();
}
}
/**
* Start services required in active state
* @throws InterruptedException
*/
void stopActiveServices() {
LOG.info("Stopping services started for active state");
writeLock();
try {
stopSecretManager();
if (lmthread != null) {
try {
lmthread.interrupt();
lmthread.join(3000);
} catch (InterruptedException ie) {
LOG.warn("Encountered exception ", ie);
}
lmthread = null;
}
} finally {
writeUnlock();
}
}
/** Start services required in standby state */
void startStandbyServices() {
LOG.info("Starting services required for standby state");
}
/** Stop services required in standby state */
void stopStandbyServices() {
LOG.info("Stopping services started for standby state");
}
public static Collection<URI> getNamespaceDirs(Configuration conf) {
return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
}
@ -502,7 +566,7 @@ NamespaceInfo getNamespaceInfo() {
}
/**
* Version of {@see #getNamespaceInfo()} that is not protected by a lock.
* Version of @see #getNamespaceInfo() that is not protected by a lock.
*/
NamespaceInfo unprotectedGetNamespaceInfo() {
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
@ -519,23 +583,16 @@ NamespaceInfo unprotectedGetNamespaceInfo() {
void close() {
fsRunning = false;
try {
if (blockManager != null) blockManager.close();
stopCommonServices();
if (smmthread != null) smmthread.interrupt();
if (dtSecretManager != null) dtSecretManager.stopThreads();
if (nnrmthread != null) nnrmthread.interrupt();
} catch (Exception e) {
LOG.warn("Exception shutting down FSNamesystem", e);
} finally {
// using finally to ensure we also wait for lease daemon
try {
if (lmthread != null) {
lmthread.interrupt();
lmthread.join(3000);
}
stopActiveServices();
stopStandbyServices();
if (dir != null) {
dir.close();
}
} catch (InterruptedException ie) {
} catch (IOException ie) {
LOG.error("Error closing FSDirectory", ie);
IOUtils.cleanup(LOG, dir);
@ -1386,7 +1443,7 @@ LocatedBlock appendFile(String src, String holder, String clientMachine)
try {
lb = startFileInternal(src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, (long)0);
false, blockManager.maxReplication, 0);
} finally {
writeUnlock();
}
@ -1469,7 +1526,7 @@ LocatedBlock getAdditionalBlock(String src,
fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = (int)pendingFile.getReplication();
replication = pendingFile.getReplication();
} finally {
writeUnlock();
}
@ -2264,7 +2321,7 @@ private Lease reassignLease(Lease lease, String src, String newHolder,
}
Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) throws IOException {
INodeFileUnderConstruction pendingFile) {
assert hasWriteLock();
pendingFile.setClientName(newHolder);
return leaseManager.reassignLease(lease, src, newHolder);
@ -2869,13 +2926,9 @@ private SafeModeInfo(boolean resourcesLow) {
* @return true if in safe mode
*/
private synchronized boolean isOn() {
try {
assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
+ "Total num of blocks, active blocks, or "
+ "total safe blocks don't match.";
} catch(IOException e) {
System.err.print(StringUtils.stringifyException(e));
}
assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
+ "Total num of blocks, active blocks, or "
+ "total safe blocks don't match.";
return this.reached >= 0;
}
@ -3029,7 +3082,7 @@ private synchronized void setBlockTotal(int total) {
this.blockTotal = total;
this.blockThreshold = (int) (blockTotal * threshold);
this.blockReplQueueThreshold =
(int) (((double) blockTotal) * replQueueThreshold);
(int) (blockTotal * replQueueThreshold);
checkMode();
}
@ -3039,7 +3092,7 @@ private synchronized void setBlockTotal(int total) {
* @param replication current replication
*/
private synchronized void incrementSafeBlockCount(short replication) {
if ((int)replication == safeReplication)
if (replication == safeReplication)
this.blockSafe++;
checkMode();
}
@ -3172,7 +3225,7 @@ public String toString() {
* Checks consistency of the class state.
* This is costly and currently called only in assert.
*/
private boolean isConsistent() throws IOException {
private boolean isConsistent() {
if (blockTotal == -1 && blockSafe == -1) {
return true; // manual safe mode
}

View File

@ -38,15 +38,13 @@
import org.apache.hadoop.fs.Trash;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@ -55,9 +53,6 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
@ -172,19 +167,18 @@ public long getProtocolVersion(String protocol,
}
}
public static final int DEFAULT_PORT = 8020;
public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
public static final HAState ACTIVE_STATE = new ActiveState();
public static final HAState STANDBY_STATE = new StandbyState();
protected FSNamesystem namesystem;
protected final Configuration conf;
protected NamenodeRole role;
private HAState state;
private final boolean haEnabled;
private final HAContext haContext;
/** httpServer */
@ -313,12 +307,11 @@ boolean isRole(NamenodeRole that) {
* Given a configuration get the address of the service rpc server
* If the service rpc is not configured returns null
*/
protected InetSocketAddress getServiceRpcServerAddress(Configuration conf)
throws IOException {
protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) {
return NameNode.getServiceAddress(conf, false);
}
protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
protected InetSocketAddress getRpcServerAddress(Configuration conf) {
return getAddress(conf);
}
@ -396,7 +389,7 @@ protected void initialize(Configuration conf) throws IOException {
throw e;
}
activate(conf);
startCommonServices(conf);
}
/**
@ -430,19 +423,11 @@ protected void validateConfigurationSettings(final Configuration conf)
}
}
/**
* Activate name-node servers and threads.
*/
void activate(Configuration conf) throws IOException {
if ((isRole(NamenodeRole.NAMENODE))
&& (UserGroupInformation.isSecurityEnabled())) {
namesystem.activateSecretManager();
}
namesystem.activate(conf);
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
namesystem.startCommonServices(conf);
startHttpServer(conf);
rpcServer.start();
startTrashEmptier(conf);
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
for (ServicePlugin p: plugins) {
@ -452,13 +437,28 @@ void activate(Configuration conf) throws IOException {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
LOG.info(getRole() + " up at: " + rpcServer.getRpcAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service server is up at: " + rpcServer.getServiceRpcAddress());
LOG.info(getRole() + " service server is up at: "
+ rpcServer.getServiceRpcAddress());
}
}
private void stopCommonServices() {
if(namesystem != null) namesystem.close();
if(rpcServer != null) rpcServer.stop();
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
p.stop();
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
stopHttpServer();
}
private void startTrashEmptier(Configuration conf) throws IOException {
long trashInterval
= conf.getLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY,
@ -470,11 +470,26 @@ private void startTrashEmptier(Configuration conf) throws IOException {
this.emptier.start();
}
private void stopTrashEmptier() {
if (this.emptier != null) {
emptier.interrupt();
emptier = null;
}
}
private void startHttpServer(final Configuration conf) throws IOException {
httpServer = new NameNodeHttpServer(conf, this, getHttpServerAddress(conf));
httpServer.start();
setHttpServerAddress(conf);
}
private void stopHttpServer() {
try {
if (httpServer != null) httpServer.stop();
} catch (Exception e) {
LOG.error("Exception while stopping httpserver", e);
}
}
/**
* Start NameNode.
@ -509,18 +524,28 @@ public NameNode(Configuration conf) throws IOException {
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
this.conf = conf;
this.role = role;
this.haEnabled = DFSUtil.isHAEnabled(conf);
this.state = !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
this.haContext = new NameNodeHAContext();
try {
initializeGenericKeys(conf, getNameServiceId(conf));
initialize(conf);
if (!haEnabled) {
state = ACTIVE_STATE;
} else {
state = STANDBY_STATE;;
}
state.enterState(haContext);
} catch (IOException e) {
this.stop();
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stop();
throw e;
} catch (ServiceFailedException e) {
this.stop();
throw new IOException("Service failed to start", e);
}
}
@ -532,6 +557,7 @@ public void join() {
try {
this.rpcServer.join();
} catch (InterruptedException ie) {
LOG.info("Caught interrupted exception ", ie);
}
}
@ -544,23 +570,12 @@ public void stop() {
return;
stopRequested = true;
}
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
p.stop();
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
try {
if (httpServer != null) httpServer.stop();
} catch (Exception e) {
LOG.error("Exception while stopping httpserver", e);
state.exitState(haContext);
} catch (ServiceFailedException e) {
LOG.warn("Encountered exception while exiting state ", e);
}
if(namesystem != null) namesystem.close();
if(emptier != null) emptier.interrupt();
if(rpcServer != null) rpcServer.stop();
stopCommonServices();
if (metrics != null) {
metrics.shutdown();
}
@ -876,27 +891,61 @@ synchronized void transitionToActive() throws ServiceFailedException {
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
state.setState(this, ACTIVE_STATE);
state.setState(haContext, ACTIVE_STATE);
}
synchronized void transitionToStandby() throws ServiceFailedException {
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
state.setState(this, STANDBY_STATE);
state.setState(haContext, STANDBY_STATE);
}
/** Check if an operation of given category is allowed */
protected synchronized void checkOperation(final OperationCategory op)
throws UnsupportedActionException {
state.checkOperation(this, op);
state.checkOperation(haContext, op);
}
public synchronized HAState getState() {
return state;
}
public synchronized void setState(final HAState s) {
state = s;
/**
* Class used as expose {@link NameNode} as context to {@link HAState}
*
* TODO:HA
* When entering and exiting state, on failing to start services,
* appropriate action is needed todo either shutdown the node or recover
* from failure.
*/
private class NameNodeHAContext implements HAContext {
@Override
public void setState(HAState s) {
state = s;
}
@Override
public HAState getState() {
return state;
}
@Override
public void startActiveServices() throws IOException {
namesystem.startActiveServices();
startTrashEmptier(conf);
}
@Override
public void stopActiveServices() throws IOException {
namesystem.stopActiveServices();
stopTrashEmptier();
}
@Override
public void startStandbyServices() throws IOException {
// TODO:HA Start reading editlog from active
}
@Override
public void stopStandbyServices() throws IOException {
// TODO:HA Stop reading editlog from active
}
}
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
@ -27,33 +30,42 @@
* service and handles operations of type {@link OperationCategory#WRITE} and
* {@link OperationCategory#READ}.
*/
@InterfaceAudience.Private
public class ActiveState extends HAState {
public ActiveState() {
super("active");
}
@Override
public void checkOperation(NameNode nn, OperationCategory op)
public void checkOperation(HAContext context, OperationCategory op)
throws UnsupportedActionException {
return; // Other than journal all operations are allowed in active state
}
@Override
public void setState(NameNode nn, HAState s) throws ServiceFailedException {
public void setState(HAContext context, HAState s) throws ServiceFailedException {
if (s == NameNode.STANDBY_STATE) {
setStateInternal(nn, s);
setStateInternal(context, s);
return;
}
super.setState(nn, s);
super.setState(context, s);
}
@Override
protected void enterState(NameNode nn) throws ServiceFailedException {
// TODO:HA
public void enterState(HAContext context) throws ServiceFailedException {
try {
context.startActiveServices();
} catch (IOException e) {
throw new ServiceFailedException("Failed to start active services", e);
}
}
@Override
protected void exitState(NameNode nn) throws ServiceFailedException {
// TODO:HA
public void exitState(HAContext context) throws ServiceFailedException {
try {
context.stopActiveServices();
} catch (IOException e) {
throw new ServiceFailedException("Failed to stop active services", e);
}
}
}

View File

@ -0,0 +1,30 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Context that is to be used by {@link HAState} for getting/setting the
* current state and performing required operations.
*/
@InterfaceAudience.Private
public interface HAContext {
/** Set the state of the context to given {@code state} */
public void setState(HAState state);
/** Get the state from the context */
public HAState getState();
/** Start the services required in active state */
public void startActiveServices() throws IOException;
/** Stop the services when exiting active state */
public void stopActiveServices() throws IOException;
/** Start the services required in standby state */
public void startStandbyServices() throws IOException;
/** Stop the services when exiting standby state */
public void stopStandbyServices() throws IOException;
}

View File

@ -19,7 +19,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
@ -44,38 +43,38 @@ public HAState(String name) {
* @param s new state
* @throws ServiceFailedException on failure to transition to new state.
*/
protected final void setStateInternal(final NameNode nn, final HAState s)
protected final void setStateInternal(final HAContext context, final HAState s)
throws ServiceFailedException {
exitState(nn);
nn.setState(s);
s.enterState(nn);
exitState(context);
context.setState(s);
s.enterState(context);
}
/**
* Method to be overridden by subclasses to perform steps necessary for
* entering a state.
* @param nn Namenode
* @param context HA context
* @throws ServiceFailedException on failure to enter the state.
*/
protected abstract void enterState(final NameNode nn)
public abstract void enterState(final HAContext context)
throws ServiceFailedException;
/**
* Method to be overridden by subclasses to perform steps necessary for
* exiting a state.
* @param nn Namenode
* @param context HA context
* @throws ServiceFailedException on failure to enter the state.
*/
protected abstract void exitState(final NameNode nn)
public abstract void exitState(final HAContext context)
throws ServiceFailedException;
/**
* Move from the existing state to a new state
* @param nn Namenode
* @param context HA context
* @param s new state
* @throws ServiceFailedException on failure to transition to new state.
*/
public void setState(NameNode nn, HAState s) throws ServiceFailedException {
public void setState(HAContext context, HAState s) throws ServiceFailedException {
if (this == s) { // Aleady in the new state
return;
}
@ -85,15 +84,15 @@ public void setState(NameNode nn, HAState s) throws ServiceFailedException {
/**
* Check if an operation is supported in a given state.
* @param nn Namenode
* @param context HA context
* @param op Type of the operation.
* @throws UnsupportedActionException if a given type of operation is not
* supported in this state.
*/
public void checkOperation(final NameNode nn, final OperationCategory op)
public void checkOperation(final HAContext context, final OperationCategory op)
throws UnsupportedActionException {
String msg = "Operation category " + op + " is not supported in state "
+ nn.getState();
+ context.getState();
throw new UnsupportedActionException(msg);
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -31,28 +34,37 @@
*
* It does not handle read/write/checkpoint operations.
*/
@InterfaceAudience.Private
public class StandbyState extends HAState {
public StandbyState() {
super("standby");
}
@Override
public void setState(NameNode nn, HAState s) throws ServiceFailedException {
public void setState(HAContext context, HAState s) throws ServiceFailedException {
if (s == NameNode.ACTIVE_STATE) {
setStateInternal(nn, s);
setStateInternal(context, s);
return;
}
super.setState(nn, s);
super.setState(context, s);
}
@Override
protected void enterState(NameNode nn) throws ServiceFailedException {
// TODO:HA
public void enterState(HAContext context) throws ServiceFailedException {
try {
context.startStandbyServices();
} catch (IOException e) {
throw new ServiceFailedException("Failed to start standby services", e);
}
}
@Override
protected void exitState(NameNode nn) throws ServiceFailedException {
// TODO:HA
public void exitState(HAContext context) throws ServiceFailedException {
try {
context.stopStandbyServices();
} catch (IOException e) {
throw new ServiceFailedException("Failed to stop standby services", e);
}
}
}