HDFS-3171. The DatanodeID "name" field is overloaded. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1308014 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-04-01 03:41:41 +00:00
parent eacd9abf50
commit 0663dbaac0
24 changed files with 171 additions and 163 deletions

View File

@ -290,6 +290,8 @@ Release 2.0.0 - UNRELEASED
HDFS-3172. dfs.upgrade.permission is dead code. (eli) HDFS-3172. dfs.upgrade.permission is dead code. (eli)
HDFS-3171. The DatanodeID "name" field is overloaded. (eli)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-3024. Improve performance of stringification in addStoredBlock (todd) HDFS-3024. Improve performance of stringification in addStoredBlock (todd)

View File

@ -183,6 +183,7 @@ public String toString() {
*/ */
public void updateRegInfo(DatanodeID nodeReg) { public void updateRegInfo(DatanodeID nodeReg) {
name = nodeReg.getName(); name = nodeReg.getName();
hostName = nodeReg.getHostName();
infoPort = nodeReg.getInfoPort(); infoPort = nodeReg.getInfoPort();
ipcPort = nodeReg.getIpcPort(); ipcPort = nodeReg.getIpcPort();
} }

View File

@ -606,8 +606,8 @@ public static DatanodeRegistrationProto convert(
DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
.newBuilder(); .newBuilder();
return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration)) return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration))
.setStorageInfo(PBHelper.convert(registration.storageInfo)) .setStorageInfo(PBHelper.convert(registration.getStorageInfo()))
.setKeys(PBHelper.convert(registration.exportedKeys)).build(); .setKeys(PBHelper.convert(registration.getExportedKeys())).build();
} }
public static DatanodeRegistration convert(DatanodeRegistrationProto proto) { public static DatanodeRegistration convert(DatanodeRegistrationProto proto) {

View File

@ -337,7 +337,7 @@ private void addDatanode(final DatanodeDescriptor node) {
} }
/** Physically remove node from datanodeMap. */ /** Physically remove node from datanodeMap. */
private void wipeDatanode(final DatanodeID node) throws IOException { private void wipeDatanode(final DatanodeID node) {
final String key = node.getStorageID(); final String key = node.getStorageID();
synchronized (datanodeMap) { synchronized (datanodeMap) {
host2DatanodeMap.remove(datanodeMap.remove(key)); host2DatanodeMap.remove(datanodeMap.remove(key));
@ -481,8 +481,7 @@ private static boolean checkInList(final DatanodeID node,
/** /**
* Decommission the node if it is in exclude list. * Decommission the node if it is in exclude list.
*/ */
private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) {
throws IOException {
// If the registered node is in exclude list, then decommission it // If the registered node is in exclude list, then decommission it
if (inExcludedHostsList(nodeReg, ipAddr)) { if (inExcludedHostsList(nodeReg, ipAddr)) {
startDecommission(nodeReg); startDecommission(nodeReg);
@ -506,7 +505,7 @@ boolean checkDecommissionState(DatanodeDescriptor node) {
} }
/** Start decommissioning the specified datanode. */ /** Start decommissioning the specified datanode. */
private void startDecommission(DatanodeDescriptor node) throws IOException { private void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node.getName() + " with " + LOG.info("Start Decommissioning node " + node.getName() + " with " +
node.numBlocks() + " blocks."); node.numBlocks() + " blocks.");
@ -519,7 +518,7 @@ private void startDecommission(DatanodeDescriptor node) throws IOException {
} }
/** Stop decommissioning the specified datanodes. */ /** Stop decommissioning the specified datanodes. */
void stopDecommission(DatanodeDescriptor node) throws IOException { void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) { if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stop Decommissioning node " + node.getName()); LOG.info("Stop Decommissioning node " + node.getName());
heartbeatManager.stopDecommission(node); heartbeatManager.stopDecommission(node);
@ -545,8 +544,16 @@ private String newStorageID() {
return newID; return newID;
} }
public void registerDatanode(DatanodeRegistration nodeReg /**
) throws IOException { * Register the given datanode with the namenode. NB: the given
* registration is mutated and given back to the datanode.
*
* @param nodeReg the datanode registration
* @throws DisallowedDatanodeException if the registration request is
* denied because the datanode does not match includes/excludes
*/
public void registerDatanode(DatanodeRegistration nodeReg)
throws DisallowedDatanodeException {
String dnAddress = Server.getRemoteAddress(); String dnAddress = Server.getRemoteAddress();
if (dnAddress == null) { if (dnAddress == null) {
// Mostly called inside an RPC. // Mostly called inside an RPC.
@ -560,16 +567,10 @@ public void registerDatanode(DatanodeRegistration nodeReg
throw new DisallowedDatanodeException(nodeReg); throw new DisallowedDatanodeException(nodeReg);
} }
String hostName = nodeReg.getHost(); // Update "name" with the IP address of the RPC request that
// is registering this datanode.
// update the datanode's name with ip:port nodeReg.setName(dnAddress + ":" + nodeReg.getPort());
DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(), nodeReg.setExportedKeys(blockManager.getBlockKeys());
hostName,
nodeReg.getStorageID(),
nodeReg.getInfoPort(),
nodeReg.getIpcPort());
nodeReg.updateRegInfo(dnReg);
nodeReg.exportedKeys = blockManager.getBlockKeys();
NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: " NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
+ "node registration from " + nodeReg.getName() + "node registration from " + nodeReg.getName()
@ -617,7 +618,6 @@ nodes with its data cleared (or user can just remove the StorageID
// update cluster map // update cluster map
getNetworkTopology().remove(nodeS); getNetworkTopology().remove(nodeS);
nodeS.updateRegInfo(nodeReg); nodeS.updateRegInfo(nodeReg);
nodeS.setHostName(hostName);
nodeS.setDisallowed(false); // Node is in the include list nodeS.setDisallowed(false); // Node is in the include list
// resolve network location // resolve network location
@ -635,7 +635,7 @@ nodes with its data cleared (or user can just remove the StorageID
// this data storage has never been registered // this data storage has never been registered
// it is either empty or was created by pre-storageID version of DFS // it is either empty or was created by pre-storageID version of DFS
nodeReg.setStorageID(newStorageID()); nodeReg.setStorageID(newStorageID());
if(NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug( NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: " "BLOCK* NameSystem.registerDatanode: "
+ "new storageID " + nodeReg.getStorageID() + " assigned."); + "new storageID " + nodeReg.getStorageID() + " assigned.");

View File

@ -325,10 +325,10 @@ synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOExcep
void registrationSucceeded(BPServiceActor bpServiceActor, void registrationSucceeded(BPServiceActor bpServiceActor,
DatanodeRegistration reg) throws IOException { DatanodeRegistration reg) throws IOException {
if (bpRegistration != null) { if (bpRegistration != null) {
checkNSEquality(bpRegistration.storageInfo.getNamespaceID(), checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
reg.storageInfo.getNamespaceID(), "namespace ID"); reg.getStorageInfo().getNamespaceID(), "namespace ID");
checkNSEquality(bpRegistration.storageInfo.getClusterID(), checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
reg.storageInfo.getClusterID(), "cluster ID"); reg.getStorageInfo().getClusterID(), "cluster ID");
} else { } else {
bpRegistration = reg; bpRegistration = reg;
} }

View File

@ -602,7 +602,7 @@ void register() throws IOException {
while (shouldRun()) { while (shouldRun()) {
try { try {
// Use returned registration from namenode with updated machine name. // Use returned registration from namenode with updated fields
bpRegistration = bpNamenode.registerDatanode(bpRegistration); bpRegistration = bpNamenode.registerDatanode(bpRegistration);
break; break;
} catch(SocketTimeoutException e) { // namenode is busy } catch(SocketTimeoutException e) { // namenode is busy

View File

@ -244,9 +244,10 @@ public static InetSocketAddress createSocketAddr(String target) {
private DataStorage storage = null; private DataStorage storage = null;
private HttpServer infoServer = null; private HttpServer infoServer = null;
DataNodeMetrics metrics; DataNodeMetrics metrics;
private InetSocketAddress selfAddr; private InetSocketAddress streamingAddr;
private volatile String hostName; // Host name of this datanode private String hostName;
private DatanodeID id;
boolean isBlockTokenEnabled; boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager; BlockPoolTokenSecretManager blockPoolTokenSecretManager;
@ -288,6 +289,7 @@ public static InetSocketAddress createSocketAddr(String target) {
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY); .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
try { try {
hostName = getHostName(conf); hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
startDataNode(conf, dataDirs, resources); startDataNode(conf, dataDirs, resources);
} catch (IOException ie) { } catch (IOException ie) {
shutdown(); shutdown();
@ -305,16 +307,25 @@ private synchronized void setClusterId(final String nsCid, final String bpid
clusterId = nsCid; clusterId = nsCid;
} }
/**
* Returns the hostname for this datanode. If the hostname is not
* explicitly configured in the given config, then it is determined
* via the DNS class.
*
* @param config
* @return the hostname (NB: may not be a FQDN)
* @throws UnknownHostException if the dfs.datanode.dns.interface
* option is used and the hostname can not be determined
*/
private static String getHostName(Configuration config) private static String getHostName(Configuration config)
throws UnknownHostException { throws UnknownHostException {
// use configured nameserver & interface to get local hostname
String name = config.get(DFS_DATANODE_HOST_NAME_KEY); String name = config.get(DFS_DATANODE_HOST_NAME_KEY);
if (name == null) { if (name == null) {
name = DNS name = DNS.getDefaultHost(
.getDefaultHost(config.get(DFS_DATANODE_DNS_INTERFACE_KEY, config.get(DFS_DATANODE_DNS_INTERFACE_KEY,
DFS_DATANODE_DNS_INTERFACE_DEFAULT), config.get( DFS_DATANODE_DNS_INTERFACE_DEFAULT),
DFS_DATANODE_DNS_NAMESERVER_KEY, config.get(DFS_DATANODE_DNS_NAMESERVER_KEY,
DFS_DATANODE_DNS_NAMESERVER_DEFAULT)); DFS_DATANODE_DNS_NAMESERVER_DEFAULT));
} }
return name; return name;
} }
@ -485,23 +496,22 @@ private synchronized void shutdownDirectoryScanner() {
} }
private void initDataXceiver(Configuration conf) throws IOException { private void initDataXceiver(Configuration conf) throws IOException {
InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf);
// find free port or use privileged port provided // find free port or use privileged port provided
ServerSocket ss; ServerSocket ss;
if(secureResources == null) { if (secureResources == null) {
InetSocketAddress addr = DataNode.getStreamingAddr(conf);
ss = (dnConf.socketWriteTimeout > 0) ? ss = (dnConf.socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket(); ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, streamingAddr, 0); Server.bind(ss, addr, 0);
} else { } else {
ss = secureResources.getStreamingSocket(); ss = secureResources.getStreamingSocket();
} }
ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
// adjust machine name with the actual port
int tmpPort = ss.getLocalPort(); streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(), ss.getLocalPort());
tmpPort);
LOG.info("Opened streaming server at " + selfAddr); LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer"); this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup, this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this)); new DataXceiverServer(ss, conf, this));
@ -646,7 +656,7 @@ void startDataNode(Configuration conf,
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager(); this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
initIpcServer(conf); initIpcServer(conf);
metrics = DataNodeMetrics.create(conf, getMachineName()); metrics = DataNodeMetrics.create(conf, getDisplayName());
blockPoolManager = new BlockPoolManager(this); blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(conf); blockPoolManager.refreshNamenodes(conf);
@ -657,14 +667,16 @@ void startDataNode(Configuration conf,
* @param nsInfo the namespace info from the first part of the NN handshake * @param nsInfo the namespace info from the first part of the NN handshake
*/ */
DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) { DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
DatanodeRegistration bpRegistration = createUnknownBPRegistration(); DatanodeRegistration bpRegistration = new DatanodeRegistration(getXferAddr());
String blockPoolId = nsInfo.getBlockPoolID(); bpRegistration.setInfoPort(getInfoPort());
bpRegistration.setIpcPort(getIpcPort());
bpRegistration.setHostName(hostName);
bpRegistration.setStorageID(getStorageId()); bpRegistration.setStorageID(getStorageId());
StorageInfo storageInfo = storage.getBPStorage(blockPoolId);
StorageInfo storageInfo = storage.getBPStorage(nsInfo.getBlockPoolID());
if (storageInfo == null) { if (storageInfo == null) {
// it's null in the case of SimulatedDataSet // it's null in the case of SimulatedDataSet
bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION; bpRegistration.getStorageInfo().layoutVersion = HdfsConstants.LAYOUT_VERSION;
bpRegistration.setStorageInfo(nsInfo); bpRegistration.setStorageInfo(nsInfo);
} else { } else {
bpRegistration.setStorageInfo(storageInfo); bpRegistration.setStorageInfo(storageInfo);
@ -679,13 +691,14 @@ DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
* Also updates the block pool's state in the secret manager. * Also updates the block pool's state in the secret manager.
*/ */
synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration, synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
String blockPoolId) String blockPoolId) throws IOException {
throws IOException { // Set the ID if we haven't already
hostName = bpRegistration.getHost(); if (null == id) {
id = bpRegistration;
}
if (storage.getStorageID().equals("")) { if (storage.getStorageID().equals("")) {
// This is a fresh datanode -- take the storage ID provided by the // This is a fresh datanode, persist the NN-provided storage ID
// NN and persist it.
storage.setStorageID(bpRegistration.getStorageID()); storage.setStorageID(bpRegistration.getStorageID());
storage.writeAll(); storage.writeAll();
LOG.info("New storage id " + bpRegistration.getStorageID() LOG.info("New storage id " + bpRegistration.getStorageID()
@ -708,7 +721,7 @@ synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
*/ */
private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration, private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration,
String blockPoolId) throws IOException { String blockPoolId) throws IOException {
ExportedBlockKeys keys = bpRegistration.exportedKeys; ExportedBlockKeys keys = bpRegistration.getExportedKeys();
isBlockTokenEnabled = keys.isBlockTokenEnabled(); isBlockTokenEnabled = keys.isBlockTokenEnabled();
// TODO should we check that all federated nns are either enabled or // TODO should we check that all federated nns are either enabled or
// disabled? // disabled?
@ -728,8 +741,8 @@ private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistrat
} }
blockPoolTokenSecretManager.setKeys(blockPoolId, blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.exportedKeys); bpRegistration.getExportedKeys());
bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS; bpRegistration.setExportedKeys(ExportedBlockKeys.DUMMY_KEYS);
} }
/** /**
@ -783,18 +796,6 @@ void initBlockPool(BPOfferService bpos) throws IOException {
data.addBlockPool(nsInfo.getBlockPoolID(), conf); data.addBlockPool(nsInfo.getBlockPoolID(), conf);
} }
/**
* Create a DatanodeRegistration object with no valid StorageInfo.
* This is used when reporting an error during handshake - ie
* before we can load any specific block pool.
*/
private DatanodeRegistration createUnknownBPRegistration() {
DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
reg.setInfoPort(infoServer.getPort());
reg.setIpcPort(getIpcPort());
return reg;
}
BPOfferService[] getAllBpOs() { BPOfferService[] getAllBpOs() {
return blockPoolManager.getAllNamenodeThreads(); return blockPoolManager.getAllNamenodeThreads();
} }
@ -844,8 +845,8 @@ private void registerMXBean() {
MBeans.register("DataNode", "DataNodeInfo", this); MBeans.register("DataNode", "DataNodeInfo", this);
} }
int getPort() { int getXferPort() {
return selfAddr.getPort(); return streamingAddr.getPort();
} }
String getStorageId() { String getStorageId() {
@ -853,14 +854,35 @@ String getStorageId() {
} }
/** /**
* Get host:port with host set to Datanode host and port set to the * @return name useful for logging
* port {@link DataXceiver} is serving.
* @return host:port string
*/ */
public String getMachineName() { public String getDisplayName() {
return hostName + ":" + getPort(); // NB: our DatanodeID may not be set yet
return hostName + ":" + getIpcPort();
} }
/**
* NB: The datanode can perform data transfer on the streaming
* address however clients are given the IPC IP address for data
* transfer, and that may be be a different address.
*
* @return socket address for data transfer
*/
public InetSocketAddress getXferAddress() {
return streamingAddr;
}
/**
* @return the IP:port to report to the NN for data transfer
*/
private String getXferAddr() {
return streamingAddr.getAddress().getHostAddress() + ":" + getXferPort();
}
/**
* @return the datanode's IPC port
*/
@VisibleForTesting
public int getIpcPort() { public int getIpcPort() {
return ipcServer.getListenerAddress().getPort(); return ipcServer.getListenerAddress().getPort();
} }
@ -880,25 +902,6 @@ DatanodeRegistration getDNRegistrationForBP(String bpid)
return bpos.bpRegistration; return bpos.bpRegistration;
} }
/**
* get BP registration by machine and port name (host:port)
* @param mName - the name that the NN used
* @return BP registration
* @throws IOException
*/
DatanodeRegistration getDNRegistrationByMachineName(String mName) {
// TODO: all the BPs should have the same name as each other, they all come
// from getName() here! and the use cases only are in tests where they just
// call with getName(). So we could probably just make this method return
// the first BPOS's registration. See HDFS-2609.
BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads();
for (BPOfferService bpos : bposArray) {
if(bpos.bpRegistration.getName().equals(mName))
return bpos.bpRegistration;
}
return null;
}
/** /**
* Creates either NIO or regular depending on socketWriteTimeout. * Creates either NIO or regular depending on socketWriteTimeout.
*/ */
@ -937,10 +940,6 @@ public InterDatanodeProtocol run() throws IOException {
} }
} }
public InetSocketAddress getSelfAddr() {
return selfAddr;
}
DataNodeMetrics getMetrics() { DataNodeMetrics getMetrics() {
return metrics; return metrics;
} }
@ -1632,7 +1631,7 @@ static ArrayList<File> getDataDirsFromURIs(Collection<URI> dataDirs,
@Override @Override
public String toString() { public String toString() {
return "DataNode{data=" + data + ", localName='" + getMachineName() return "DataNode{data=" + data + ", localName='" + getDisplayName()
+ "', storageID='" + getStorageId() + "', xmitsInProgress=" + "', storageID='" + getStorageId() + "', xmitsInProgress="
+ xmitsInProgress.get() + "}"; + xmitsInProgress.get() + "}";
} }
@ -1998,7 +1997,6 @@ private static void logRecoverBlock(String who,
+ ", targets=[" + msg + "])"); + ", targets=[" + msg + "])");
} }
// ClientDataNodeProtocol implementation
@Override // ClientDataNodeProtocol @Override // ClientDataNodeProtocol
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
checkWriteAccess(block); checkWriteAccess(block);
@ -2076,8 +2074,7 @@ void finalizeUpgradeForPool(String blockPoolId) throws IOException {
storage.finalizeUpgrade(blockPoolId); storage.finalizeUpgrade(blockPoolId);
} }
// Determine a Datanode's streaming address static InetSocketAddress getStreamingAddr(Configuration conf) {
public static InetSocketAddress getStreamingAddr(Configuration conf) {
return NetUtils.createSocketAddr( return NetUtils.createSocketAddr(
conf.get(DFS_DATANODE_ADDRESS_KEY, DFS_DATANODE_ADDRESS_DEFAULT)); conf.get(DFS_DATANODE_ADDRESS_KEY, DFS_DATANODE_ADDRESS_DEFAULT));
} }
@ -2099,8 +2096,11 @@ public String getHttpPort(){
return this.getConf().get("dfs.datanode.info.port"); return this.getConf().get("dfs.datanode.info.port");
} }
public int getInfoPort(){ /**
return this.infoServer.getPort(); * @return the datanode's http port
*/
public int getInfoPort() {
return infoServer.getPort();
} }
/** /**
@ -2142,7 +2142,7 @@ public void refreshNamenodes(Configuration conf) throws IOException {
blockPoolManager.refreshNamenodes(conf); blockPoolManager.refreshNamenodes(conf);
} }
@Override //ClientDatanodeProtocol @Override // ClientDatanodeProtocol
public void refreshNamenodes() throws IOException { public void refreshNamenodes() throws IOException {
conf = new Configuration(); conf = new Configuration();
refreshNamenodes(conf); refreshNamenodes(conf);
@ -2206,8 +2206,7 @@ public boolean isDatanodeFullyStarted() {
@VisibleForTesting @VisibleForTesting
public DatanodeID getDatanodeId() { public DatanodeID getDatanodeId() {
return new DatanodeID(getMachineName(), hostName, getStorageId(), return id;
infoServer.getPort(), getIpcPort());
} }
/** /**

View File

@ -194,7 +194,7 @@ synchronized void recoverTransitionRead(DataNode datanode,
} }
// make sure we have storage id set - if not - generate new one // make sure we have storage id set - if not - generate new one
createStorageID(datanode.getPort()); createStorageID(datanode.getXferPort());
// 3. Update all storages. Some of them might have just been formatted. // 3. Update all storages. Some of them might have just been formatted.
this.writeAll(); this.writeAll();

View File

@ -168,13 +168,13 @@ public void run() {
++opsProcessed; ++opsProcessed;
} while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0); } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error(datanode.getMachineName() + ":DataXceiver error processing " + LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
((op == null) ? "unknown" : op.name()) + " operation " + ((op == null) ? "unknown" : op.name()) + " operation " +
" src: " + remoteAddress + " src: " + remoteAddress +
" dest: " + localAddress, t); " dest: " + localAddress, t);
} finally { } finally {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getMachineName() + ":Number of active connections is: " LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
+ datanode.getXceiverCount()); + datanode.getXceiverCount());
} }
updateCurrentThreadName("Cleaning up"); updateCurrentThreadName("Cleaning up");

View File

@ -152,11 +152,11 @@ public void run() {
// another thread closed our listener socket - that's expected during shutdown, // another thread closed our listener socket - that's expected during shutdown,
// but not in other circumstances // but not in other circumstances
if (datanode.shouldRun) { if (datanode.shouldRun) {
LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ace); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
} }
} catch (IOException ie) { } catch (IOException ie) {
IOUtils.closeSocket(s); IOUtils.closeSocket(s);
LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ie); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
} catch (OutOfMemoryError ie) { } catch (OutOfMemoryError ie) {
IOUtils.closeSocket(s); IOUtils.closeSocket(s);
// DataNode can run out of memory if there is too many transfers. // DataNode can run out of memory if there is too many transfers.
@ -169,7 +169,7 @@ public void run() {
// ignore // ignore
} }
} catch (Throwable te) { } catch (Throwable te) {
LOG.error(datanode.getMachineName() LOG.error(datanode.getDisplayName()
+ ":DataXceiverServer: Exiting due to: ", te); + ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false; datanode.shouldRun = false;
} }
@ -177,7 +177,7 @@ public void run() {
try { try {
ss.close(); ss.close();
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn(datanode.getMachineName() LOG.warn(datanode.getDisplayName()
+ " :DataXceiverServer: close exception", ie); + " :DataXceiverServer: close exception", ie);
} }
} }
@ -188,7 +188,7 @@ void kill() {
try { try {
this.ss.close(); this.ss.close();
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn(datanode.getMachineName() + ":DataXceiverServer.kill(): ", ie); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
} }
// close all the sockets that were accepted earlier // close all the sockets that were accepted earlier

View File

@ -55,7 +55,7 @@ synchronized void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
if( ! super.initializeUpgrade()) if( ! super.initializeUpgrade())
return; // distr upgrade is not needed return; // distr upgrade is not needed
DataNode.LOG.info("\n Distributed upgrade for DataNode " DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.getMachineName() + dataNode.getDisplayName()
+ " version " + getUpgradeVersion() + " to current LV " + " version " + getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is initialized."); + HdfsConstants.LAYOUT_VERSION + " is initialized.");
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first(); UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
@ -113,7 +113,7 @@ public synchronized boolean startUpgrade() throws IOException {
upgradeDaemon = new Daemon(curUO); upgradeDaemon = new Daemon(curUO);
upgradeDaemon.start(); upgradeDaemon.start();
DataNode.LOG.info("\n Distributed upgrade for DataNode " DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.getMachineName() + dataNode.getDisplayName()
+ " version " + getUpgradeVersion() + " to current LV " + " version " + getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is started."); + HdfsConstants.LAYOUT_VERSION + " is started.");
return true; return true;
@ -128,7 +128,7 @@ synchronized void processUpgradeCommand(UpgradeCommand command
if(startUpgrade()) // upgrade started if(startUpgrade()) // upgrade started
return; return;
throw new IOException( throw new IOException(
"Distributed upgrade for DataNode " + dataNode.getMachineName() "Distributed upgrade for DataNode " + dataNode.getDisplayName()
+ " version " + getUpgradeVersion() + " to current LV " + " version " + getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " cannot be started. " + HdfsConstants.LAYOUT_VERSION + " cannot be started. "
+ "The upgrade object is not defined."); + "The upgrade object is not defined.");
@ -143,7 +143,7 @@ public synchronized void completeUpgrade() throws IOException {
currentUpgrades = null; currentUpgrades = null;
upgradeDaemon = null; upgradeDaemon = null;
DataNode.LOG.info("\n Distributed upgrade for DataNode " DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.getMachineName() + dataNode.getDisplayName()
+ " version " + getUpgradeVersion() + " to current LV " + " version " + getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is complete."); + HdfsConstants.LAYOUT_VERSION + " is complete.");
} }

View File

@ -80,9 +80,8 @@ public interface DatanodeProtocol {
* *
* @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem#registerDatanode(DatanodeRegistration) * @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem#registerDatanode(DatanodeRegistration)
* @param registration datanode registration information * @param registration datanode registration information
* @return updated {@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration}, which contains * @return the given {@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration} with
* new storageID if the datanode did not have one and * updated registration information
* registration ID for further communication.
*/ */
public DatanodeRegistration registerDatanode(DatanodeRegistration registration public DatanodeRegistration registerDatanode(DatanodeRegistration registration
) throws IOException; ) throws IOException;

View File

@ -49,8 +49,8 @@ public class DatanodeRegistration extends DatanodeID
}); });
} }
public StorageInfo storageInfo; private StorageInfo storageInfo;
public ExportedBlockKeys exportedKeys; private ExportedBlockKeys exportedKeys;
/** /**
* Default constructor. * Default constructor.
@ -84,6 +84,18 @@ public void setStorageInfo(StorageInfo storage) {
this.storageInfo = new StorageInfo(storage); this.storageInfo = new StorageInfo(storage);
} }
public StorageInfo getStorageInfo() {
return storageInfo;
}
public void setExportedKeys(ExportedBlockKeys keys) {
this.exportedKeys = keys;
}
public ExportedBlockKeys getExportedKeys() {
return exportedKeys;
}
@Override // NodeRegistration @Override // NodeRegistration
public int getVersion() { public int getVersion() {
return storageInfo.getLayoutVersion(); return storageInfo.getLayoutVersion();

View File

@ -1041,9 +1041,9 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
// hadoop.security.token.service.use_ip=true // hadoop.security.token.service.use_ip=true
//since the HDFS does things based on IP:port, we need to add the mapping //since the HDFS does things based on IP:port, we need to add the mapping
//for IP:port to rackId //for IP:port to rackId
String ipAddr = dn.getSelfAddr().getAddress().getHostAddress(); String ipAddr = dn.getXferAddress().getAddress().getHostAddress();
if (racks != null) { if (racks != null) {
int port = dn.getSelfAddr().getPort(); int port = dn.getXferAddress().getPort();
LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
" to rack " + racks[i-curDatanodesNum]); " to rack " + racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(ipAddr + ":" + port, StaticMapping.addNodeToRack(ipAddr + ":" + port,
@ -1422,7 +1422,7 @@ public synchronized DataNodeProperties stopDataNode(int i) {
DataNodeProperties dnprop = dataNodes.remove(i); DataNodeProperties dnprop = dataNodes.remove(i);
DataNode dn = dnprop.datanode; DataNode dn = dnprop.datanode;
LOG.info("MiniDFSCluster Stopping DataNode " + LOG.info("MiniDFSCluster Stopping DataNode " +
dn.getMachineName() + dn.getDisplayName() +
" from a total of " + (dataNodes.size() + 1) + " from a total of " + (dataNodes.size() + 1) +
" datanodes."); " datanodes.");
dn.shutdown(); dn.shutdown();
@ -1433,16 +1433,13 @@ public synchronized DataNodeProperties stopDataNode(int i) {
/* /*
* Shutdown a datanode by name. * Shutdown a datanode by name.
*/ */
public synchronized DataNodeProperties stopDataNode(String name) { public synchronized DataNodeProperties stopDataNode(String dnName) {
int i; int i;
for (i = 0; i < dataNodes.size(); i++) { for (i = 0; i < dataNodes.size(); i++) {
DataNode dn = dataNodes.get(i).datanode; DataNode dn = dataNodes.get(i).datanode;
// get BP registration LOG.info("DN name=" + dnName + " found DN=" + dn +
DatanodeRegistration dnR = " with name=" + dn.getDisplayName());
DataNodeTestUtils.getDNRegistrationByMachineName(dn, name); if (dnName.equals(dn.getDatanodeId().getName())) {
LOG.info("for name=" + name + " found bp=" + dnR +
"; with dnMn=" + dn.getMachineName());
if(dnR != null) {
break; break;
} }
} }
@ -1472,9 +1469,9 @@ public synchronized boolean restartDataNode(DataNodeProperties dnprop,
String[] args = dnprop.dnArgs; String[] args = dnprop.dnArgs;
Configuration newconf = new HdfsConfiguration(conf); // save cloned config Configuration newconf = new HdfsConfiguration(conf); // save cloned config
if (keepPort) { if (keepPort) {
InetSocketAddress addr = dnprop.datanode.getSelfAddr(); InetSocketAddress addr = dnprop.datanode.getXferAddress();
conf.set(DFS_DATANODE_ADDRESS_KEY, addr.getAddress().getHostAddress() + ":" conf.set(DFS_DATANODE_ADDRESS_KEY,
+ addr.getPort()); addr.getAddress().getHostAddress() + ":" + addr.getPort());
} }
dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf), dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf),
newconf, args)); newconf, args));

View File

@ -158,7 +158,7 @@ public void testSocketCache() throws IOException {
testFile.toString(), 0, FILE_SIZE) testFile.toString(), 0, FILE_SIZE)
.getLocatedBlocks().get(0); .getLocatedBlocks().get(0);
DataNode dn = util.getDataNode(block); DataNode dn = util.getDataNode(block);
InetSocketAddress dnAddr = dn.getSelfAddr(); InetSocketAddress dnAddr = dn.getXferAddress();
// Make some sockets to the DN // Make some sockets to the DN
Socket[] dnSockets = new Socket[CACHE_SIZE]; Socket[] dnSockets = new Socket[CACHE_SIZE];

View File

@ -50,7 +50,7 @@ public void testDFSAddressConfig() throws IOException {
ArrayList<DataNode> dns = cluster.getDataNodes(); ArrayList<DataNode> dns = cluster.getDataNodes();
DataNode dn = dns.get(0); DataNode dn = dns.get(0);
String selfSocketAddr = dn.getSelfAddr().toString(); String selfSocketAddr = dn.getXferAddress().toString();
System.out.println("DN Self Socket Addr == " + selfSocketAddr); System.out.println("DN Self Socket Addr == " + selfSocketAddr);
assertTrue(selfSocketAddr.contains("/127.0.0.1:")); assertTrue(selfSocketAddr.contains("/127.0.0.1:"));
@ -75,7 +75,7 @@ public void testDFSAddressConfig() throws IOException {
dns = cluster.getDataNodes(); dns = cluster.getDataNodes();
dn = dns.get(0); dn = dns.get(0);
selfSocketAddr = dn.getSelfAddr().toString(); selfSocketAddr = dn.getXferAddress().toString();
System.out.println("DN Self Socket Addr == " + selfSocketAddr); System.out.println("DN Self Socket Addr == " + selfSocketAddr);
// assert that default self socket address is 127.0.0.1 // assert that default self socket address is 127.0.0.1
assertTrue(selfSocketAddr.contains("/127.0.0.1:")); assertTrue(selfSocketAddr.contains("/127.0.0.1:"));
@ -100,7 +100,7 @@ public void testDFSAddressConfig() throws IOException {
dns = cluster.getDataNodes(); dns = cluster.getDataNodes();
dn = dns.get(0); dn = dns.get(0);
selfSocketAddr = dn.getSelfAddr().toString(); selfSocketAddr = dn.getXferAddress().toString();
System.out.println("DN Self Socket Addr == " + selfSocketAddr); System.out.println("DN Self Socket Addr == " + selfSocketAddr);
// assert that default self socket address is 0.0.0.0 // assert that default self socket address is 0.0.0.0
assertTrue(selfSocketAddr.contains("/0.0.0.0:")); assertTrue(selfSocketAddr.contains("/0.0.0.0:"));

View File

@ -269,7 +269,7 @@ private void blockCorruptionRecoveryPolicy(int numDataNodes,
if (corruptReplica(block, i)) { if (corruptReplica(block, i)) {
corruptReplicasDNIDs[j++] = i; corruptReplicasDNIDs[j++] = i;
LOG.info("successfully corrupted block " + block + " on node " LOG.info("successfully corrupted block " + block + " on node "
+ i + " " + cluster.getDataNodes().get(i).getSelfAddr()); + i + " " + cluster.getDataNodes().get(i).getDisplayName());
} }
} }
@ -281,7 +281,7 @@ private void blockCorruptionRecoveryPolicy(int numDataNodes,
for (int i = numCorruptReplicas - 1; i >= 0 ; i--) { for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
LOG.info("restarting node with corrupt replica: position " LOG.info("restarting node with corrupt replica: position "
+ i + " node " + corruptReplicasDNIDs[i] + " " + i + " node " + corruptReplicasDNIDs[i] + " "
+ cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getSelfAddr()); + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
cluster.restartDataNode(corruptReplicasDNIDs[i]); cluster.restartDataNode(corruptReplicasDNIDs[i]);
} }
@ -343,7 +343,7 @@ public void testTruncatedBlockReport() throws Exception {
if (!changeReplicaLength(block, 0, -1)) { if (!changeReplicaLength(block, 0, -1)) {
throw new IOException( throw new IOException(
"failed to find or change length of replica on node 0 " "failed to find or change length of replica on node 0 "
+ cluster.getDataNodes().get(0).getSelfAddr()); + cluster.getDataNodes().get(0).getDisplayName());
} }
} finally { } finally {
cluster.shutdown(); cluster.shutdown();

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -58,8 +59,9 @@ public static void setUp() throws Exception {
cluster = (new MiniDFSCluster.Builder(conf)) cluster = (new MiniDFSCluster.Builder(conf))
.numDataNodes(1).build(); .numDataNodes(1).build();
nnAddress = cluster.getNameNode().getNameNodeAddress(); nnAddress = cluster.getNameNode().getNameNodeAddress();
dnAddress = new InetSocketAddress(cluster.getDataNodes().get(0) DataNode dn = cluster.getDataNodes().get(0);
.getDatanodeId().getHost(), cluster.getDataNodes().get(0).getIpcPort()); dnAddress = new InetSocketAddress(dn.getDatanodeId().getHost(),
dn.getIpcPort());
} }
@AfterClass @AfterClass

View File

@ -432,8 +432,8 @@ public void testConvertDatanodeRegistration() {
new StorageInfo(), expKeys); new StorageInfo(), expKeys);
DatanodeRegistrationProto proto = PBHelper.convert(reg); DatanodeRegistrationProto proto = PBHelper.convert(reg);
DatanodeRegistration reg2 = PBHelper.convert(proto); DatanodeRegistration reg2 = PBHelper.convert(proto);
compare(reg.storageInfo, reg2.storageInfo); compare(reg.getStorageInfo(), reg2.getStorageInfo());
compare(reg.exportedKeys, reg2.exportedKeys); compare(reg.getExportedKeys(), reg2.getExportedKeys());
compare((DatanodeID)reg, (DatanodeID)reg2); compare((DatanodeID)reg, (DatanodeID)reg2);
} }

View File

@ -37,11 +37,6 @@
* *
*/ */
public class DataNodeTestUtils { public class DataNodeTestUtils {
public static DatanodeRegistration
getDNRegistrationByMachineName(DataNode dn, String mName) {
return dn.getDNRegistrationByMachineName(mName);
}
public static DatanodeRegistration public static DatanodeRegistration
getDNRegistrationForBP(DataNode dn, String bpid) throws IOException { getDNRegistrationForBP(DataNode dn, String bpid) throws IOException {
return dn.getDNRegistrationForBP(bpid); return dn.getDNRegistrationForBP(bpid);

View File

@ -383,7 +383,7 @@ private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
public SimulatedFSDataset(DataNode datanode, DataStorage storage, public SimulatedFSDataset(DataNode datanode, DataStorage storage,
Configuration conf) { Configuration conf) {
if (storage != null) { if (storage != null) {
storage.createStorageID(datanode.getPort()); storage.createStorageID(datanode.getXferPort());
this.storageId = storage.getStorageID(); this.storageId = storage.getStorageID();
} else { } else {
this.storageId = "unknownStorageId" + new Random().nextInt(); this.storageId = "unknownStorageId" + new Random().nextInt();

View File

@ -679,8 +679,9 @@ private void startDNandWait(Path filePath, boolean waitReplicas)
assertEquals(datanodes.size(), 2); assertEquals(datanodes.size(), 2);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
int lastDn = datanodes.size() - 1;
LOG.debug("New datanode " LOG.debug("New datanode "
+ cluster.getDataNodes().get(datanodes.size() - 1).getMachineName() + cluster.getDataNodes().get(lastDn).getDisplayName()
+ " has been started"); + " has been started");
} }
if (waitReplicas) DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR); if (waitReplicas) DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR);

View File

@ -183,7 +183,7 @@ public void testDfsAdminDeleteBlockPool() throws Exception {
Assert.assertEquals(1, dn1.getAllBpOs().length); Assert.assertEquals(1, dn1.getAllBpOs().length);
DFSAdmin admin = new DFSAdmin(nn1Conf); DFSAdmin admin = new DFSAdmin(nn1Conf);
String dn1Address = dn1.getSelfAddr().getHostName()+":"+dn1.getIpcPort(); String dn1Address = dn1.getDatanodeId().getHost() + ":" + dn1.getIpcPort();
String[] args = { "-deleteBlockPool", dn1Address, bpid2 }; String[] args = { "-deleteBlockPool", dn1Address, bpid2 };
int ret = admin.run(args); int ret = admin.run(args);

View File

@ -136,7 +136,7 @@ public void testReplicationError() throws Exception {
DataNode datanode = cluster.getDataNodes().get(sndNode); DataNode datanode = cluster.getDataNodes().get(sndNode);
// replicate the block to the second datanode // replicate the block to the second datanode
InetSocketAddress target = datanode.getSelfAddr(); InetSocketAddress target = datanode.getXferAddress();
Socket s = new Socket(target.getAddress(), target.getPort()); Socket s = new Socket(target.getAddress(), target.getPort());
// write the header. // write the header.
DataOutputStream out = new DataOutputStream(s.getOutputStream()); DataOutputStream out = new DataOutputStream(s.getOutputStream());