HDFS-9239. DataNode Lifeline Protocol: an alternative protocol for reporting DataNode liveness. Contributed by Chris Nauroth.

This commit is contained in:
Chris Nauroth 2016-03-04 15:29:50 -08:00
parent 8e08861a14
commit 2759689d7d
26 changed files with 1129 additions and 39 deletions

View File

@ -290,6 +290,8 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `ReplaceBlockOpAvgTime` | Average time of block replace operations in milliseconds |
| `HeartbeatsNumOps` | Total number of heartbeats |
| `HeartbeatsAvgTime` | Average heartbeat time in milliseconds |
| `LifelinesNumOps` | Total number of lifeline messages |
| `LifelinesAvgTime` | Average lifeline message processing time in milliseconds |
| `BlockReportsNumOps` | Total number of block report operations |
| `BlockReportsAvgTime` | Average time of block report operations in milliseconds |
| `IncrementalBlockReportsNumOps` | Total number of incremental block report operations |

View File

@ -343,6 +343,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<includes>
<include>HdfsServer.proto</include>
<include>DatanodeProtocol.proto</include>
<include>DatanodeLifelineProtocol.proto</include>
<include>HAZKInfo.proto</include>
<include>InterDatanodeProtocol.proto</include>
<include>JournalProtocol.proto</include>

View File

@ -512,6 +512,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
public static final String DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY =
"dfs.datanode.lifeline.interval.seconds";
public static final String DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms";
public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 30000L;
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
@ -522,8 +524,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
public static final int DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT = 1;
public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY = "dfs.namenode.lifeline.handler.count";
public static final String DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY =
"dfs.namenode.lifeline.handler.ratio";
public static final float DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT =
0.1f;
public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY =
"dfs.namenode.lifeline.handler.count";
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";

View File

@ -27,6 +27,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
@ -569,6 +570,40 @@ public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddress
return addressList;
}
/**
* Returns list of InetSocketAddresses corresponding to lifeline RPC servers
* at namenodes from the configuration.
*
* @param conf configuration
* @return list of InetSocketAddress
* @throws IOException on error
*/
public static Map<String, Map<String, InetSocketAddress>>
getNNLifelineRpcAddressesForCluster(Configuration conf)
throws IOException {
Collection<String> parentNameServices = conf.getTrimmedStringCollection(
DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
if (parentNameServices.isEmpty()) {
parentNameServices = conf.getTrimmedStringCollection(
DFSConfigKeys.DFS_NAMESERVICES);
} else {
// Ensure that the internal service is indeed in the list of all available
// nameservices.
Set<String> availableNameServices = Sets.newHashSet(conf
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
for (String nsId : parentNameServices) {
if (!availableNameServices.contains(nsId)) {
throw new IOException("Unknown nameservice: " + nsId);
}
}
}
return DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, null,
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
}
/**
* Map a logical namenode ID to its lifeline address. Use the given
* nameservice if specified, or the configured one if none is given.

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client side translator to translate the requests made on
* {@link DatanodeLifelineProtocol} interfaces to the RPC server implementing
* {@link DatanodeLifelineProtocolPB}.
*/
@InterfaceAudience.Private
public class DatanodeLifelineProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, DatanodeLifelineProtocol, Closeable {
/** RpcController is not used and hence is set to null. */
private static final RpcController NULL_CONTROLLER = null;
private final DatanodeLifelineProtocolPB rpcProxy;
public DatanodeLifelineProtocolClientSideTranslatorPB(
InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DatanodeLifelineProtocolPB.class,
ProtobufRpcEngine.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}
private static DatanodeLifelineProtocolPB createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf,
UserGroupInformation ugi) throws IOException {
return RPC.getProxy(DatanodeLifelineProtocolPB.class,
RPC.getProtocolVersion(DatanodeLifelineProtocolPB.class), nameNodeAddr,
ugi, conf,
NetUtils.getSocketFactory(conf, DatanodeLifelineProtocolPB.class));
}
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
@Override
public void sendLifeline(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
.setFailedVolumes(failedVolumes);
builder.addAllReports(PBHelperClient.convertStorageReports(reports));
if (cacheCapacity != 0) {
builder.setCacheCapacity(cacheCapacity);
}
if (cacheUsed != 0) {
builder.setCacheUsed(cacheUsed);
}
if (volumeFailureSummary != null) {
builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
volumeFailureSummary));
}
try {
rpcProxy.sendLifeline(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
@Override // ProtocolMetaInterface
public boolean isMethodSupported(String methodName)
throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy,
DatanodeLifelineProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(DatanodeLifelineProtocolPB.class), methodName);
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol used by a DataNode to send lifeline messages to a NameNode.
*/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
@ProtocolInfo(
protocolName =
"org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface DatanodeLifelineProtocolPB extends
DatanodeLifelineProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.LifelineResponseProto;
import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Implementation for protobuf service that forwards requests
* received on {@link DatanodeLifelineProtocolPB} to the
* {@link DatanodeLifelineProtocol} server implementation.
*/
@InterfaceAudience.Private
public class DatanodeLifelineProtocolServerSideTranslatorPB implements
DatanodeLifelineProtocolPB {
private static final LifelineResponseProto VOID_LIFELINE_RESPONSE_PROTO =
LifelineResponseProto.newBuilder().build();
private final DatanodeLifelineProtocol impl;
public DatanodeLifelineProtocolServerSideTranslatorPB(
DatanodeLifelineProtocol impl) {
this.impl = impl;
}
@Override
public LifelineResponseProto sendLifeline(RpcController controller,
HeartbeatRequestProto request) throws ServiceException {
try {
final StorageReport[] report = PBHelperClient.convertStorageReports(
request.getReportsList());
VolumeFailureSummary volumeFailureSummary =
request.hasVolumeFailureSummary() ?
PBHelper.convertVolumeFailureSummary(
request.getVolumeFailureSummary()) : null;
impl.sendLifeline(PBHelper.convert(request.getRegistration()), report,
request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(), request.getXceiverCount(),
request.getFailedVolumes(), volumeFailureSummary);
return VOID_LIFELINE_RESPONSE_PROTO;
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -1495,6 +1495,47 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
return new DatanodeCommand[0];
}
/**
* Handles a lifeline message sent by a DataNode.
*
* @param nodeReg registration info for DataNode sending the lifeline
* @param reports storage reports from DataNode
* @param blockPoolId block pool ID
* @param cacheCapacity cache capacity at DataNode
* @param cacheUsed cache used at DataNode
* @param xceiverCount estimated count of transfer threads running at DataNode
* @param maxTransfers count of transfers running at DataNode
* @param failedVolumes count of failed volumes at DataNode
* @param volumeFailureSummary info on failed volumes at DataNode
* @throws IOException if there is an error
*/
public void handleLifeline(DatanodeRegistration nodeReg,
StorageReport[] reports, String blockPoolId, long cacheCapacity,
long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
}
DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
if (nodeinfo == null) {
// This is null if the DataNode has not yet registered. We expect this
// will never happen, because the DataNode has logic to prevent sending
// lifeline messages until after initial registration is successful.
// Lifeline message handling can't send commands back to the DataNode to
// tell it to register, so simply exit.
return;
}
if (nodeinfo.isDisallowed()) {
// This is highly unlikely, because heartbeat handling is much more
// frequent and likely would have already sent the disallowed error.
// Lifeline messages are not intended to send any kind of control response
// back to the DataNode, so simply exit.
return;
}
heartbeatManager.updateLifeline(nodeinfo, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
}
/**
* Convert a CachedBlockList into a DatanodeCommand with a list of blocks.
*

View File

@ -240,6 +240,20 @@ synchronized void updateHeartbeat(final DatanodeDescriptor node,
stats.add(node);
}
synchronized void updateLifeline(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
// This intentionally calls updateHeartbeatState instead of
// updateHeartbeat, because we don't want to modify the
// heartbeatedSinceRegistration flag. Arrival of a lifeline message does
// not count as arrival of the first heartbeat.
node.updateHeartbeatState(reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
}
synchronized void startDecommission(final DatanodeDescriptor node) {
if (!node.isAlive()) {
LOG.info("Dead node {} is decommissioned immediately.", node);
@ -416,4 +430,4 @@ public void run() {
}
}
}
}
}

View File

@ -120,17 +120,22 @@ void writeUnlock() {
mWriteLock.unlock();
}
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
BPOfferService(List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
"Must pass same number of NN addresses and lifeline addresses.");
this.dn = dn;
for (InetSocketAddress addr : nnAddrs) {
this.bpServices.add(new BPServiceActor(addr, this));
for (int i = 0; i < nnAddrs.size(); ++i) {
this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
lifelineNnAddrs.get(i), this));
}
}
void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
void refreshNNList(ArrayList<InetSocketAddress> addrs,
ArrayList<InetSocketAddress> lifelineAddrs) throws IOException {
Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
for (BPServiceActor actor : bpServices) {
oldAddrs.add(actor.getNNSocketAddress());

View File

@ -19,6 +19,7 @@
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -28,6 +29,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@ -40,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -103,14 +106,20 @@ static enum RunningState {
final LinkedList<BPServiceActorAction> bpThreadQueue
= new LinkedList<BPServiceActorAction>();
BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
BPOfferService bpos) {
this.bpos = bpos;
this.dn = bpos.getDataNode();
this.nnAddr = nnAddr;
this.lifelineSender = lifelineNnAddr != null ?
new LifelineSender(lifelineNnAddr) : null;
this.initialRegistrationComplete = lifelineNnAddr != null ?
new CountDownLatch(1) : null;
this.dnConf = dn.getDnConf();
this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
prevBlockReportId = ThreadLocalRandom.current().nextLong();
scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
scheduler = new Scheduler(dnConf.heartBeatInterval,
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
}
public DatanodeRegistration getBpRegistration() {
@ -138,6 +147,9 @@ InetSocketAddress getNNSocketAddress() {
return nnAddr;
}
private final CountDownLatch initialRegistrationComplete;
private final LifelineSender lifelineSender;
/**
* Used to inject a spy NN in the unit tests.
*/
@ -151,6 +163,20 @@ DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() {
return bpNamenode;
}
/**
* Used to inject a spy NN in the unit tests.
*/
@VisibleForTesting
void setLifelineNameNode(
DatanodeLifelineProtocolClientSideTranslatorPB dnLifelineProtocol) {
lifelineSender.lifelineNamenode = dnLifelineProtocol;
}
@VisibleForTesting
DatanodeLifelineProtocolClientSideTranslatorPB getLifelineNameNodeProxy() {
return lifelineSender.lifelineNamenode;
}
/**
* Perform the first part of the handshake with the NameNode.
* This calls <code>versionRequest</code> to determine the NN's
@ -420,29 +446,39 @@ void start() {
//Thread is started already
return;
}
bpThread = new Thread(this, formatThreadName());
bpThread = new Thread(this, formatThreadName("heartbeating", nnAddr));
bpThread.setDaemon(true); // needed for JUnit testing
bpThread.start();
if (lifelineSender != null) {
lifelineSender.start();
}
}
private String formatThreadName() {
private String formatThreadName(String action, InetSocketAddress addr) {
Collection<StorageLocation> dataDirs =
DataNode.getStorageLocations(dn.getConf());
return "DataNode: [" + dataDirs.toString() + "] " +
" heartbeating to " + nnAddr;
return "DataNode: [" + dataDirs.toString() + "] " +
action + " to " + addr;
}
//This must be called only by blockPoolManager.
void stop() {
shouldServiceRun = false;
if (lifelineSender != null) {
lifelineSender.stop();
}
if (bpThread != null) {
bpThread.interrupt();
bpThread.interrupt();
}
}
//This must be called only by blockPoolManager
void join() {
try {
if (lifelineSender != null) {
lifelineSender.join();
}
if (bpThread != null) {
bpThread.join();
}
@ -454,6 +490,7 @@ private synchronized void cleanUp() {
shouldServiceRun = false;
IOUtils.cleanup(null, bpNamenode);
IOUtils.cleanup(null, lifelineSender);
bpos.shutdownActor(this);
}
@ -480,7 +517,9 @@ private void offerService() throws Exception {
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+ " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec"
+ "; heartBeatInterval=" + dnConf.heartBeatInterval);
+ "; heartBeatInterval=" + dnConf.heartBeatInterval
+ (lifelineSender != null ?
"; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
long fullBlockReportLeaseId = 0;
//
@ -684,6 +723,9 @@ public void run() {
}
runningState = RunningState.RUNNING;
if (initialRegistrationComplete != null) {
initialRegistrationComplete.countDown();
}
while (shouldRun()) {
try {
@ -797,6 +839,135 @@ Scheduler getScheduler() {
return scheduler;
}
private final class LifelineSender implements Runnable, Closeable {
private final InetSocketAddress lifelineNnAddr;
private Thread lifelineThread;
private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;
public LifelineSender(InetSocketAddress lifelineNnAddr) {
this.lifelineNnAddr = lifelineNnAddr;
}
@Override
public void close() {
stop();
try {
join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
IOUtils.cleanup(null, lifelineNamenode);
}
@Override
public void run() {
// The lifeline RPC depends on registration with the NameNode, so wait for
// initial registration to complete.
while (shouldRun()) {
try {
initialRegistrationComplete.await();
break;
} catch (InterruptedException e) {
// The only way thread interruption can happen while waiting on this
// latch is if the state of the actor has been updated to signal
// shutdown. The next loop's call to shouldRun() will return false,
// and the thread will finish.
Thread.currentThread().interrupt();
}
}
// After initial NameNode registration has completed, execute the main
// loop for sending periodic lifeline RPCs if needed. This is done in a
// second loop to avoid a pointless wait on the above latch in every
// iteration of the main loop.
while (shouldRun()) {
try {
if (lifelineNamenode == null) {
lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr);
}
sendLifelineIfDue();
Thread.sleep(scheduler.getLifelineWaitTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException e) {
LOG.warn("IOException in LifelineSender for " + BPServiceActor.this,
e);
}
}
LOG.info("LifelineSender for " + BPServiceActor.this + " exiting.");
}
public void start() {
lifelineThread = new Thread(this, formatThreadName("lifeline",
lifelineNnAddr));
lifelineThread.setDaemon(true);
lifelineThread.setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread thread, Throwable t) {
LOG.error(thread + " terminating on unexpected exception", t);
}
});
lifelineThread.start();
}
public void stop() {
if (lifelineThread != null) {
lifelineThread.interrupt();
}
}
public void join() throws InterruptedException {
if (lifelineThread != null) {
lifelineThread.join();
}
}
private void sendLifelineIfDue() throws IOException {
long startTime = scheduler.monotonicNow();
if (!scheduler.isLifelineDue(startTime)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
+ ", because it is not due.");
}
return;
}
if (dn.areHeartbeatsDisabledForTests()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
+ ", because heartbeats are disabled for tests.");
}
return;
}
sendLifeline();
dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime);
scheduler.scheduleNextLifeline(scheduler.monotonicNow());
}
private void sendLifeline() throws IOException {
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending lifeline with " + reports.length + " storage " +
" reports from service actor: " + BPServiceActor.this);
}
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
lifelineNamenode.sendLifeline(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
}
}
/**
* Utility class that wraps the timestamp computations for scheduling
* heartbeats and block reports.
@ -811,6 +982,9 @@ static class Scheduler {
@VisibleForTesting
volatile long nextHeartbeatTime = monotonicNow();
@VisibleForTesting
volatile long nextLifelineTime = monotonicNow();
@VisibleForTesting
boolean resetBlockReportTime = true;
@ -818,10 +992,13 @@ static class Scheduler {
new AtomicBoolean(false);
private final long heartbeatIntervalMs;
private final long lifelineIntervalMs;
private final long blockReportIntervalMs;
Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
long blockReportIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.lifelineIntervalMs = lifelineIntervalMs;
this.blockReportIntervalMs = blockReportIntervalMs;
}
@ -835,19 +1012,31 @@ static class Scheduler {
// Blockreport.
long scheduleHeartbeat() {
nextHeartbeatTime = monotonicNow();
scheduleNextLifeline(nextHeartbeatTime);
return nextHeartbeatTime;
}
long scheduleNextHeartbeat() {
// Numerical overflow is possible here and is okay.
nextHeartbeatTime = monotonicNow() + heartbeatIntervalMs;
scheduleNextLifeline(nextHeartbeatTime);
return nextHeartbeatTime;
}
long scheduleNextLifeline(long baseTime) {
// Numerical overflow is possible here and is okay.
nextLifelineTime = baseTime + lifelineIntervalMs;
return nextLifelineTime;
}
boolean isHeartbeatDue(long startTime) {
return (nextHeartbeatTime - startTime <= 0);
}
boolean isLifelineDue(long startTime) {
return (nextLifelineTime - startTime <= 0);
}
boolean isBlockReportDue(long curTime) {
return nextBlockReportTime - curTime <= 0;
}
@ -903,6 +1092,10 @@ long getHeartbeatWaitTime() {
return nextHeartbeatTime - monotonicNow();
}
long getLifelineWaitTime() {
return nextLifelineTime - monotonicNow();
}
/**
* Wrapped for testing.
* @return

View File

@ -151,14 +151,18 @@ void refreshNamenodes(Configuration conf)
Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
.getNNServiceRpcAddressesForCluster(conf);
Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = DFSUtil
.getNNLifelineRpcAddressesForCluster(conf);
synchronized (refreshNamenodesLock) {
doRefreshNamenodes(newAddressMap);
doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
}
}
private void doRefreshNamenodes(
Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
Map<String, Map<String, InetSocketAddress>> addrMap,
Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set<String> toRefresh = Sets.newLinkedHashSet();
@ -195,9 +199,19 @@ private void doRefreshNamenodes(
Joiner.on(",").useForNull("<default>").join(toAdd));
for (String nsToAdd : toAdd) {
Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
Map<String, InetSocketAddress> nnIdToLifelineAddr =
lifelineAddrMap.get(nsToAdd);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToAdd).values());
BPOfferService bpos = createBPOS(addrs);
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<InetSocketAddress> lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
}
BPOfferService bpos = createBPOS(addrs, lifelineAddrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
@ -227,9 +241,19 @@ private void doRefreshNamenodes(
for (String nsToRefresh : toRefresh) {
BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToRefresh);
Map<String, InetSocketAddress> nnIdToLifelineAddr =
lifelineAddrMap.get(nsToRefresh);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToRefresh).values());
bpos.refreshNNList(addrs);
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<InetSocketAddress> lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
}
bpos.refreshNNList(addrs, lifelineAddrs);
}
}
}
@ -237,7 +261,8 @@ private void doRefreshNamenodes(
/**
* Extracted out for test purposes.
*/
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
return new BPOfferService(nnAddrs, dn);
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs) {
return new BPOfferService(nnAddrs, lifelineNnAddrs, dn);
}
}

View File

@ -27,6 +27,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@ -87,6 +88,7 @@ public class DNConf {
final long readaheadLength;
final long heartBeatInterval;
private final long lifelineIntervalMs;
final long blockReportInterval;
final long blockReportSplitThreshold;
final long ibrInterval;
@ -185,6 +187,20 @@ public DNConf(Configuration conf) {
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
long confLifelineIntervalMs =
conf.getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
3 * conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT)) * 1000L;
if (confLifelineIntervalMs <= heartBeatInterval) {
confLifelineIntervalMs = 3 * heartBeatInterval;
DataNode.LOG.warn(
String.format("%s must be set to a value greater than %s. " +
"Resetting value to 3 * %s, which is %d milliseconds.",
DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_KEY,
confLifelineIntervalMs));
}
lifelineIntervalMs = confLifelineIntervalMs;
// do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
@ -338,4 +354,13 @@ public int getTransferSocketSendBufferSize() {
public long getBpReadyTimeout() {
return bpReadyTimeout;
}
/**
* Returns the interval in milliseconds between sending lifeline messages.
*
* @return interval in milliseconds between sending lifeline messages
*/
public long getLifelineIntervalMs() {
return lifelineIntervalMs;
}
}

View File

@ -142,6 +142,7 @@
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
@ -1647,6 +1648,19 @@ DatanodeProtocolClientSideTranslatorPB connectToNN(
return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf);
}
/**
* Connect to the NN for the lifeline protocol. This is separated out for
* easier testing.
*
* @param lifelineNnAddr address of lifeline RPC server
* @return lifeline RPC proxy
*/
DatanodeLifelineProtocolClientSideTranslatorPB connectToLifelineNN(
InetSocketAddress lifelineNnAddr) throws IOException {
return new DatanodeLifelineProtocolClientSideTranslatorPB(lifelineNnAddr,
conf);
}
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
DatanodeID datanodeid, final Configuration conf, final int socketTimeout,
final boolean connectToDnViaHostname) throws IOException {

View File

@ -107,6 +107,7 @@ public class DataNodeMetrics {
@Metric MutableRate copyBlockOp;
@Metric MutableRate replaceBlockOp;
@Metric MutableRate heartbeats;
@Metric MutableRate lifelines;
@Metric MutableRate blockReports;
@Metric MutableRate incrementalBlockReports;
@Metric MutableRate cacheReports;
@ -199,6 +200,10 @@ public void addHeartbeat(long latency) {
heartbeats.add(latency);
}
public void addLifeline(long latency) {
lifelines.add(latency);
}
public void addBlockReport(long latency) {
blockReports.add(latency);
}

View File

@ -3625,6 +3625,37 @@ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
}
}
/**
* Handles a lifeline message sent by a DataNode. This method updates contact
* information and statistics for the DataNode, so that it doesn't time out.
* Unlike a heartbeat, this method does not dispatch any commands back to the
* DataNode for local execution. This method also cannot request a lease for
* sending a full block report. Lifeline messages are used only as a fallback
* in case something prevents successful delivery of heartbeat messages.
* Therefore, the implementation of this method must remain lightweight
* compared to heartbeat handling. It should avoid lock contention and
* expensive computation.
*
* @param nodeReg registration info for DataNode sending the lifeline
* @param reports storage reports from DataNode
* @param cacheCapacity cache capacity at DataNode
* @param cacheUsed cache used at DataNode
* @param xceiverCount estimated count of transfer threads running at DataNode
* @param xmitsInProgress count of transfers running at DataNode
* @param failedVolumes count of failed volumes at DataNode
* @param volumeFailureSummary info on failed volumes at DataNode
* @throws IOException if there is an error
*/
void handleLifeline(DatanodeRegistration nodeReg, StorageReport[] reports,
long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress,
int failedVolumes, VolumeFailureSummary volumeFailureSummary)
throws IOException {
int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress;
blockManager.getDatanodeManager().handleLifeline(nodeReg, reports,
getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer,
failedVolumes, volumeFailureSummary);
}
/**
* Returns whether or not there were available resources at the last check of
* resources.

View File

@ -19,8 +19,9 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
@ -111,11 +112,14 @@
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
@ -256,6 +260,11 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);
DatanodeLifelineProtocolServerSideTranslatorPB lifelineProtoPbTranslator =
new DatanodeLifelineProtocolServerSideTranslatorPB(this);
BlockingService lifelineProtoPbService = DatanodeLifelineProtocolService
.newReflectiveBlockingService(lifelineProtoPbTranslator);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService NNPbService = NamenodeProtocolService
@ -371,9 +380,14 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
lifelineRpcAddr.getPort());
int lifelineHandlerCount = conf.getInt(
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY,
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT);
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, 0);
if (lifelineHandlerCount <= 0) {
float lifelineHandlerRatio = conf.getFloat(
DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY,
DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT);
lifelineHandlerCount = Math.max(
(int)(handlerCount * lifelineHandlerRatio), 1);
}
lifelineRpcServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
@ -384,6 +398,9 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
DFSUtil.addPBProtocol(conf, DatanodeLifelineProtocolPB.class,
lifelineProtoPbService, lifelineRpcServer);
// Update the address with the correct port
InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
@ -1509,6 +1526,17 @@ public NamespaceInfo versionRequest() throws IOException {
return namesystem.getNamespaceInfo();
}
@Override // DatanodeLifelineProtocol
public void sendLifeline(DatanodeRegistration nodeReg, StorageReport[] report,
long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
namesystem.handleLifeline(nodeReg, report, dnCacheCapacity, dnCacheUsed,
xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary);
}
/**
* Verifies the given registration.
*

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol used by a DataNode to send lifeline messages to a NameNode.
*/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
@InterfaceAudience.Private
public interface DatanodeLifelineProtocol {
@Idempotent
void sendLifeline(DatanodeRegistration registration, StorageReport[] reports,
long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException;
}

View File

@ -34,6 +34,7 @@
public interface NamenodeProtocols
extends ClientProtocol,
DatanodeProtocol,
DatanodeLifelineProtocol,
NamenodeProtocol,
RefreshAuthorizationPolicyProtocol,
ReconfigurationProtocol,

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *stable* .proto interface.
*/
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
option java_outer_classname = "DatanodeLifelineProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs.datanodelifeline;
import "DatanodeProtocol.proto";
// The lifeline protocol does not use a new request message type. Instead, it
// reuses the existing heartbeat request message.
// Unlike heartbeats, the response is empty. There is no command dispatch.
message LifelineResponseProto {
}
service DatanodeLifelineProtocolService {
rpc sendLifeline(hadoop.hdfs.datanode.HeartbeatRequestProto)
returns(LifelineResponseProto);
}

View File

@ -704,6 +704,22 @@
<description>Determines datanode heartbeat interval in seconds.</description>
</property>
<property>
<name>dfs.datanode.lifeline.interval.seconds</name>
<value></value>
<description>
Sets the interval in seconds between sending DataNode Lifeline Protocol
messages from the DataNode to the NameNode. The value must be greater than
the value of dfs.heartbeat.interval. If this property is not defined, then
the default behavior is to calculate the interval as 3x the value of
dfs.heartbeat.interval. Note that normal heartbeat processing may cause the
DataNode to postpone sending lifeline messages if they are not required.
Under normal operations with speedy heartbeat processing, it is possible
that no lifeline messages will need to be sent at all. This property has no
effect if dfs.namenode.lifeline.rpc-address is not defined.
</description>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>10</value>
@ -725,14 +741,34 @@
</property>
<property>
<name>dfs.namenode.lifeline.handler.count</name>
<value>1</value>
<name>dfs.namenode.lifeline.handler.ratio</name>
<value>0.10</value>
<description>
Sets number of RPC server threads the NameNode runs for handling the
lifeline RPC server. The default value is 1, because this RPC server
handles only HA health check requests from ZKFC. These are lightweight
requests that run single-threaded from the ZKFC client side. This property
has no effect if dfs.namenode.lifeline.rpc-address is not defined.
A ratio applied to the value of dfs.namenode.handler.count, which then
provides the number of RPC server threads the NameNode runs for handling the
lifeline RPC server. For example, if dfs.namenode.handler.count is 100, and
dfs.namenode.lifeline.handler.factor is 0.10, then the NameNode starts
100 * 0.10 = 10 threads for handling the lifeline RPC server. It is common
to tune the value of dfs.namenode.handler.count as a function of the number
of DataNodes in a cluster. Using this property allows for the lifeline RPC
server handler threads to be tuned automatically without needing to touch a
separate property. Lifeline message processing is lightweight, so it is
expected to require many fewer threads than the main NameNode RPC server.
This property is not used if dfs.namenode.lifeline.handler.count is defined,
which sets an absolute thread count. This property has no effect if
dfs.namenode.lifeline.rpc-address is not defined.
</description>
</property>
<property>
<name>dfs.namenode.lifeline.handler.count</name>
<value></value>
<description>
Sets an absolute number of RPC server threads the NameNode runs for handling
the DataNode Lifeline Protocol and HA health check requests from ZKFC. If
this property is defined, then it overrides the behavior of
dfs.namenode.lifeline.handler.ratio. By default, it is not defined. This
property has no effect if dfs.namenode.lifeline.rpc-address is not defined.
</description>
</property>

View File

@ -26,6 +26,7 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@ -396,7 +397,8 @@ private BPOfferService setupBPOSForNNs(DataNode mockDn,
Mockito.eq(new InetSocketAddress(port)));
}
return new BPOfferService(Lists.newArrayList(nnMap.keySet()), mockDn);
return new BPOfferService(Lists.newArrayList(nnMap.keySet()),
Collections.<InetSocketAddress>nCopies(nnMap.size(), null), mockDn);
}
private void waitForInitialization(final BPOfferService bpos)

View File

@ -51,7 +51,8 @@ public void setupBPM() {
bpm = new BlockPoolManager(mockDN){
@Override
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs) {
final int idx = mockIdx++;
doLog("create #" + idx);
final BPOfferService bpos = Mockito.mock(BPOfferService.class);
@ -66,6 +67,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(bpos).refreshNNList(
Mockito.<ArrayList<InetSocketAddress>>any(),
Mockito.<ArrayList<InetSocketAddress>>any());
} catch (IOException e) {
throw new RuntimeException(e);

View File

@ -49,6 +49,7 @@ public class TestBpServiceActorScheduler {
public Timeout timeout = new Timeout(300000);
private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
private final Random random = new Random(System.nanoTime());
@ -166,9 +167,23 @@ public void testScheduleDelayedHeartbeat() {
}
}
@Test
public void testScheduleLifeline() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
scheduler.scheduleNextLifeline(now);
assertFalse(scheduler.isLifelineDue(now));
assertThat(scheduler.getLifelineWaitTime(), is(LIFELINE_INTERVAL_MS));
scheduler.scheduleNextLifeline(now - LIFELINE_INTERVAL_MS);
assertTrue(scheduler.isLifelineDue(now));
assertThat(scheduler.getLifelineWaitTime(), is(0L));
}
}
private Scheduler makeMockScheduler(long now) {
LOG.info("Using now = " + now);
Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS,
LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.nextHeartbeatTime = now;

View File

@ -0,0 +1,300 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
/**
* Test suite covering lifeline protocol handling in the DataNode.
*/
public class TestDataNodeLifeline {
private static final Logger LOG = LoggerFactory.getLogger(
TestDataNodeLifeline.class);
static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
}
@Rule
public Timeout timeout = new Timeout(60000);
private MiniDFSCluster cluster;
private HdfsConfiguration conf;
private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;
private DataNodeMetrics metrics;
private DatanodeProtocolClientSideTranslatorPB namenode;
private FSNamesystem namesystem;
@Before
public void setup() throws Exception {
// Configure cluster with lifeline RPC server enabled, and down-tune
// heartbeat timings to try to force quick dead/stale DataNodes.
conf = new HdfsConfiguration();
conf.setInt(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, 2);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 6 * 1000);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
namesystem = cluster.getNameNode().getNamesystem();
// Set up spies on RPC proxies so that we can inject failures.
DataNode dn = cluster.getDataNodes().get(0);
metrics = dn.getMetrics();
assertNotNull(metrics);
List<BPOfferService> allBpos = dn.getAllBpOs();
assertNotNull(allBpos);
assertEquals(1, allBpos.size());
BPOfferService bpos = allBpos.get(0);
List<BPServiceActor> allBpsa = bpos.getBPServiceActors();
assertNotNull(allBpsa);
assertEquals(1, allBpsa.size());
final BPServiceActor bpsa = allBpsa.get(0);
assertNotNull(bpsa);
// Lifeline RPC proxy gets created on separate thread, so poll until found.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
if (bpsa.getLifelineNameNodeProxy() != null) {
lifelineNamenode = spy(bpsa.getLifelineNameNodeProxy());
bpsa.setLifelineNameNode(lifelineNamenode);
}
return lifelineNamenode != null;
}
}, 100, 10000);
assertNotNull(bpsa.getNameNodeProxy());
namenode = spy(bpsa.getNameNodeProxy());
bpsa.setNameNode(namenode);
}
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
GenericTestUtils.assertNoThreadsMatching(".*lifeline.*");
}
}
@Test
public void testSendLifelineIfHeartbeatBlocked() throws Exception {
// Run the test for the duration of sending 10 lifeline RPC messages.
int numLifelines = 10;
CountDownLatch lifelinesSent = new CountDownLatch(numLifelines);
// Intercept heartbeat to inject an artificial delay, until all expected
// lifeline RPC messages have been sent.
doAnswer(new LatchAwaitingAnswer<HeartbeatResponse>(lifelinesSent))
.when(namenode).sendHeartbeat(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean());
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
.when(lifelineNamenode).sendLifeline(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class));
// While waiting on the latch for the expected number of lifeline messages,
// poll DataNode tracking information. Thanks to the lifeline, we expect
// that the DataNode always stays alive, and never goes stale or dead.
while (!lifelinesSent.await(1, SECONDS)) {
assertEquals("Expect DataNode to be kept alive by lifeline.", 1,
namesystem.getNumLiveDataNodes());
assertEquals("Expect DataNode not marked dead due to lifeline.", 0,
namesystem.getNumDeadDataNodes());
assertEquals("Expect DataNode not marked stale due to lifeline.", 0,
namesystem.getNumStaleDataNodes());
}
// Verify that we did in fact call the lifeline RPC.
verify(lifelineNamenode, atLeastOnce()).sendLifeline(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class));
// Also verify lifeline call through metrics. We expect at least
// numLifelines, guaranteed by waiting on the latch. There is a small
// possibility of extra lifeline calls depending on timing, so we allow
// slack in the assertion.
assertTrue("Expect metrics to count at least " + numLifelines + " calls.",
getLongCounter("LifelinesNumOps", getMetrics(metrics.name())) >=
numLifelines);
}
@Test
public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception {
// Run the test for the duration of sending 10 heartbeat RPC messages.
int numHeartbeats = 10;
CountDownLatch heartbeatsSent = new CountDownLatch(numHeartbeats);
// Intercept heartbeat to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<HeartbeatResponse>(heartbeatsSent))
.when(namenode).sendHeartbeat(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean());
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always
// stays alive, and never goes stale or dead.
while (!heartbeatsSent.await(1, SECONDS)) {
assertEquals("Expect DataNode to be kept alive by lifeline.", 1,
namesystem.getNumLiveDataNodes());
assertEquals("Expect DataNode not marked dead due to lifeline.", 0,
namesystem.getNumDeadDataNodes());
assertEquals("Expect DataNode not marked stale due to lifeline.", 0,
namesystem.getNumStaleDataNodes());
}
// Verify that we did not call the lifeline RPC.
verify(lifelineNamenode, never()).sendLifeline(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class));
// Also verify no lifeline calls through metrics.
assertEquals("Expect metrics to count no lifeline calls.", 0,
getLongCounter("LifelinesNumOps", getMetrics(metrics.name())));
}
/**
* Waits on a {@link CountDownLatch} before calling through to the method.
*/
private final class LatchAwaitingAnswer<T> implements Answer<T> {
private final CountDownLatch latch;
public LatchAwaitingAnswer(CountDownLatch latch) {
this.latch = latch;
}
@Override
@SuppressWarnings("unchecked")
public T answer(InvocationOnMock invocation)
throws Throwable {
LOG.info("Awaiting, remaining latch count is {}.", latch.getCount());
latch.await();
return (T)invocation.callRealMethod();
}
}
/**
* Counts on a {@link CountDownLatch} after each call through to the method.
*/
private final class LatchCountingAnswer<T> implements Answer<T> {
private final CountDownLatch latch;
public LatchCountingAnswer(CountDownLatch latch) {
this.latch = latch;
}
@Override
@SuppressWarnings("unchecked")
public T answer(InvocationOnMock invocation)
throws Throwable {
T result = (T)invocation.callRealMethod();
latch.countDown();
LOG.info("Countdown, remaining latch count is {}.", latch.getCount());
return result;
}
}
}

View File

@ -61,7 +61,7 @@ public void setUp() throws IOException {
BPOfferService mockBPOS = mock(BPOfferService.class);
doReturn(mockDN).when(mockBPOS).getDataNode();
actor = new BPServiceActor(INVALID_ADDR, mockBPOS);
actor = new BPServiceActor(INVALID_ADDR, null, mockBPOS);
fakeNsInfo = mock(NamespaceInfo.class);
// Return a a good software version.