HDFS-3224. Bug in check for DN re-registration with different storage ID. Contributed by Jason Lowe
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1396798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c9a1d4dc33
commit
08f35a04c6
@ -1705,6 +1705,9 @@ Release 0.23.5 - UNRELEASED
|
|||||||
HDFS-3824. TestHftpDelegationToken fails intermittently with JDK7 (Trevor
|
HDFS-3824. TestHftpDelegationToken fails intermittently with JDK7 (Trevor
|
||||||
Robinson via tgraves)
|
Robinson via tgraves)
|
||||||
|
|
||||||
|
HDFS-3224. Bug in check for DN re-registration with different storage ID
|
||||||
|
(jlowe)
|
||||||
|
|
||||||
Release 0.23.4 - UNRELEASED
|
Release 0.23.4 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -612,7 +612,8 @@ public void registerDatanode(DatanodeRegistration nodeReg)
|
|||||||
+ " storage " + nodeReg.getStorageID());
|
+ " storage " + nodeReg.getStorageID());
|
||||||
|
|
||||||
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
||||||
DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getXferAddr());
|
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
|
||||||
|
nodeReg.getIpAddr(), nodeReg.getXferPort());
|
||||||
|
|
||||||
if (nodeN != null && nodeN != nodeS) {
|
if (nodeN != null && nodeN != nodeS) {
|
||||||
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
|
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
|
||||||
|
@ -159,6 +159,35 @@ DatanodeDescriptor getDatanodeByHost(String ipAddr) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find data node by its transfer address
|
||||||
|
*
|
||||||
|
* @return DatanodeDescriptor if found or null otherwise
|
||||||
|
*/
|
||||||
|
public DatanodeDescriptor getDatanodeByXferAddr(String ipAddr,
|
||||||
|
int xferPort) {
|
||||||
|
if (ipAddr==null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
hostmapLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
DatanodeDescriptor[] nodes = map.get(ipAddr);
|
||||||
|
// no entry
|
||||||
|
if (nodes== null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
for(DatanodeDescriptor containedNode:nodes) {
|
||||||
|
if (xferPort == containedNode.getXferPort()) {
|
||||||
|
return containedNode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
} finally {
|
||||||
|
hostmapLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
|
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
@ -91,6 +92,58 @@ public void testChangeIpcPort() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChangeStorageID() throws Exception {
|
||||||
|
final String DN_IP_ADDR = "127.0.0.1";
|
||||||
|
final String DN_HOSTNAME = "localhost";
|
||||||
|
final int DN_XFER_PORT = 12345;
|
||||||
|
final int DN_INFO_PORT = 12346;
|
||||||
|
final int DN_IPC_PORT = 12347;
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(0)
|
||||||
|
.build();
|
||||||
|
InetSocketAddress addr = new InetSocketAddress(
|
||||||
|
"localhost",
|
||||||
|
cluster.getNameNodePort());
|
||||||
|
DFSClient client = new DFSClient(addr, conf);
|
||||||
|
NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
|
||||||
|
|
||||||
|
// register a datanode
|
||||||
|
DatanodeID dnId = new DatanodeID(DN_IP_ADDR, DN_HOSTNAME,
|
||||||
|
"fake-storage-id", DN_XFER_PORT, DN_INFO_PORT, DN_IPC_PORT);
|
||||||
|
long nnCTime = cluster.getNamesystem().getFSImage().getStorage()
|
||||||
|
.getCTime();
|
||||||
|
StorageInfo mockStorageInfo = mock(StorageInfo.class);
|
||||||
|
doReturn(nnCTime).when(mockStorageInfo).getCTime();
|
||||||
|
doReturn(HdfsConstants.LAYOUT_VERSION).when(mockStorageInfo)
|
||||||
|
.getLayoutVersion();
|
||||||
|
DatanodeRegistration dnReg = new DatanodeRegistration(dnId,
|
||||||
|
mockStorageInfo, null, VersionInfo.getVersion());
|
||||||
|
rpcServer.registerDatanode(dnReg);
|
||||||
|
|
||||||
|
DatanodeInfo[] report = client.datanodeReport(DatanodeReportType.ALL);
|
||||||
|
assertEquals("Expected a registered datanode", 1, report.length);
|
||||||
|
|
||||||
|
// register the same datanode again with a different storage ID
|
||||||
|
dnId = new DatanodeID(DN_IP_ADDR, DN_HOSTNAME,
|
||||||
|
"changed-fake-storage-id", DN_XFER_PORT, DN_INFO_PORT, DN_IPC_PORT);
|
||||||
|
dnReg = new DatanodeRegistration(dnId,
|
||||||
|
mockStorageInfo, null, VersionInfo.getVersion());
|
||||||
|
rpcServer.registerDatanode(dnReg);
|
||||||
|
|
||||||
|
report = client.datanodeReport(DatanodeReportType.ALL);
|
||||||
|
assertEquals("Datanode with changed storage ID not recognized",
|
||||||
|
1, report.length);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegistrationWithDifferentSoftwareVersions() throws Exception {
|
public void testRegistrationWithDifferentSoftwareVersions() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
Loading…
Reference in New Issue
Block a user