HDFS-9004. Add upgrade domain to DatanodeInfo. Contributed by Ming Ma (via Lei (Eddy) Xu).
Change-Id: I887c66578eebd61acc34b94f18da6e6851c609f4
This commit is contained in:
parent
c39ddc306d
commit
3a9c7076e8
@ -53,7 +53,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||
private String location = NetworkTopology.DEFAULT_RACK;
|
||||
private String softwareVersion;
|
||||
private List<String> dependentHostNames = new LinkedList<String>();
|
||||
|
||||
private String upgradeDomain;
|
||||
|
||||
// Datanode administrative states
|
||||
public enum AdminStates {
|
||||
@ -95,6 +95,7 @@ public DatanodeInfo(DatanodeInfo from) {
|
||||
this.xceiverCount = from.getXceiverCount();
|
||||
this.location = from.getNetworkLocation();
|
||||
this.adminState = from.getAdminState();
|
||||
this.upgradeDomain = from.getUpgradeDomain();
|
||||
}
|
||||
|
||||
public DatanodeInfo(DatanodeID nodeID) {
|
||||
@ -120,12 +121,13 @@ public DatanodeInfo(DatanodeID nodeID, String location,
|
||||
final long capacity, final long dfsUsed, final long remaining,
|
||||
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
|
||||
final long lastUpdate, final long lastUpdateMonotonic,
|
||||
final int xceiverCount, final AdminStates adminState) {
|
||||
final int xceiverCount, final AdminStates adminState,
|
||||
final String upgradeDomain) {
|
||||
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getDatanodeUuid(),
|
||||
nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(),
|
||||
nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed,
|
||||
cacheCapacity, cacheUsed, lastUpdate, lastUpdateMonotonic,
|
||||
xceiverCount, location, adminState);
|
||||
xceiverCount, location, adminState, upgradeDomain);
|
||||
}
|
||||
|
||||
/** Constructor */
|
||||
@ -137,6 +139,22 @@ public DatanodeInfo(final String ipAddr, final String hostName,
|
||||
final long lastUpdate, final long lastUpdateMonotonic,
|
||||
final int xceiverCount, final String networkLocation,
|
||||
final AdminStates adminState) {
|
||||
this(ipAddr, hostName, datanodeUuid, xferPort, infoPort, infoSecurePort,
|
||||
ipcPort, capacity, dfsUsed, remaining, blockPoolUsed, cacheCapacity,
|
||||
cacheUsed, lastUpdate, lastUpdateMonotonic, xceiverCount,
|
||||
networkLocation, adminState, null);
|
||||
}
|
||||
|
||||
/** Constructor */
|
||||
public DatanodeInfo(final String ipAddr, final String hostName,
|
||||
final String datanodeUuid, final int xferPort, final int infoPort,
|
||||
final int infoSecurePort, final int ipcPort,
|
||||
final long capacity, final long dfsUsed, final long remaining,
|
||||
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
|
||||
final long lastUpdate, final long lastUpdateMonotonic,
|
||||
final int xceiverCount, final String networkLocation,
|
||||
final AdminStates adminState,
|
||||
final String upgradeDomain) {
|
||||
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
|
||||
infoSecurePort, ipcPort);
|
||||
this.capacity = capacity;
|
||||
@ -150,6 +168,7 @@ public DatanodeInfo(final String ipAddr, final String hostName,
|
||||
this.xceiverCount = xceiverCount;
|
||||
this.location = networkLocation;
|
||||
this.adminState = adminState;
|
||||
this.upgradeDomain = upgradeDomain;
|
||||
}
|
||||
|
||||
/** Network location name */
|
||||
@ -300,6 +319,16 @@ public synchronized void setNetworkLocation(String location) {
|
||||
this.location = NodeBase.normalize(location);
|
||||
}
|
||||
|
||||
/** Sets the upgrade domain */
|
||||
public void setUpgradeDomain(String upgradeDomain) {
|
||||
this.upgradeDomain = upgradeDomain;
|
||||
}
|
||||
|
||||
/** upgrade domain */
|
||||
public String getUpgradeDomain() {
|
||||
return upgradeDomain;
|
||||
}
|
||||
|
||||
/** Add a hostname to a list of network dependencies */
|
||||
public void addDependentHostName(String hostname) {
|
||||
dependentHostNames.add(hostname);
|
||||
@ -341,6 +370,9 @@ public String getDatanodeReport() {
|
||||
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
||||
buffer.append("Rack: "+location+"\n");
|
||||
}
|
||||
if (upgradeDomain != null) {
|
||||
buffer.append("Upgrade domain: "+ upgradeDomain +"\n");
|
||||
}
|
||||
buffer.append("Decommission Status : ");
|
||||
if (isDecommissioned()) {
|
||||
buffer.append("Decommissioned\n");
|
||||
@ -380,6 +412,9 @@ public String dumpDatanode() {
|
||||
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
||||
buffer.append(" "+location);
|
||||
}
|
||||
if (upgradeDomain != null) {
|
||||
buffer.append(" " + upgradeDomain);
|
||||
}
|
||||
if (isDecommissioned()) {
|
||||
buffer.append(" DD");
|
||||
} else if (isDecommissionInProgress()) {
|
||||
|
@ -140,6 +140,9 @@ public static DatanodeInfoProto convert(DatanodeInfo info) {
|
||||
if (info.getNetworkLocation() != null) {
|
||||
builder.setLocation(info.getNetworkLocation());
|
||||
}
|
||||
if (info.getUpgradeDomain() != null) {
|
||||
builder.setUpgradeDomain(info.getUpgradeDomain());
|
||||
}
|
||||
builder
|
||||
.setId(convert((DatanodeID) info))
|
||||
.setCapacity(info.getCapacity())
|
||||
|
@ -241,7 +241,8 @@ static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
|
||||
getLong(m, "lastUpdateMonotonic", 0l),
|
||||
getInt(m, "xceiverCount", 0),
|
||||
getString(m, "networkLocation", ""),
|
||||
DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
|
||||
DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")),
|
||||
getString(m, "upgradeDomain", ""));
|
||||
}
|
||||
|
||||
/** Convert an Object[] to a DatanodeInfo[]. */
|
||||
|
@ -98,6 +98,7 @@ message DatanodeInfoProto {
|
||||
optional uint64 cacheCapacity = 11 [default = 0];
|
||||
optional uint64 cacheUsed = 12 [default = 0];
|
||||
optional uint64 lastUpdateMonotonic = 13 [default = 0];
|
||||
optional string upgradeDomain = 14;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -594,7 +594,8 @@ static public DatanodeInfo convert(DatanodeInfoProto di) {
|
||||
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
|
||||
di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
|
||||
di.getLastUpdate(), di.getLastUpdateMonotonic(),
|
||||
di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
|
||||
di.getXceiverCount(), PBHelper.convert(di.getAdminState()),
|
||||
di.hasUpgradeDomain() ? di.getUpgradeDomain() : null);
|
||||
}
|
||||
|
||||
static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
|
||||
|
@ -5922,6 +5922,9 @@ public String getLiveNodes() {
|
||||
.put("estimatedCapacityLostTotal",
|
||||
volumeFailureSummary.getEstimatedCapacityLostTotal());
|
||||
}
|
||||
if (node.getUpgradeDomain() != null) {
|
||||
innerinfo.put("upgradeDomain", node.getUpgradeDomain());
|
||||
}
|
||||
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo.build());
|
||||
}
|
||||
return JSON.toString(info);
|
||||
|
@ -169,6 +169,9 @@ static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) {
|
||||
m.put("xceiverCount", datanodeinfo.getXceiverCount());
|
||||
m.put("networkLocation", datanodeinfo.getNetworkLocation());
|
||||
m.put("adminState", datanodeinfo.getAdminState().name());
|
||||
if (datanodeinfo.getUpgradeDomain() != null) {
|
||||
m.put("upgradeDomain", datanodeinfo.getUpgradeDomain());
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,8 @@
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
@ -76,6 +78,15 @@ public void testNameNodeMXBeanInfo() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
cluster.waitActive();
|
||||
|
||||
// Set upgrade domain on the first DN.
|
||||
String upgradeDomain = "abcd";
|
||||
DatanodeManager dm = cluster.getNameNode().getNamesystem().
|
||||
getBlockManager().getDatanodeManager();
|
||||
DatanodeDescriptor dd = dm.getDatanode(
|
||||
cluster.getDataNodes().get(0).getDatanodeId());
|
||||
dd.setUpgradeDomain(upgradeDomain);
|
||||
String dnXferAddrWithUpgradeDomainSet = dd.getXferAddr();
|
||||
|
||||
FSNamesystem fsn = cluster.getNameNode().namesystem;
|
||||
|
||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
@ -125,6 +136,15 @@ public void testNameNodeMXBeanInfo() throws Exception {
|
||||
assertTrue(((Long)liveNode.get("capacity")) > 0);
|
||||
assertTrue(liveNode.containsKey("numBlocks"));
|
||||
assertTrue(((Long)liveNode.get("numBlocks")) == 0);
|
||||
// a. By default the upgrade domain isn't defined on any DN.
|
||||
// b. If the upgrade domain is set on a DN, JMX should have the same
|
||||
// value.
|
||||
String xferAddr = (String)liveNode.get("xferaddr");
|
||||
if (!xferAddr.equals(dnXferAddrWithUpgradeDomainSet)) {
|
||||
assertTrue(!liveNode.containsKey("upgradeDomain"));
|
||||
} else {
|
||||
assertTrue(liveNode.get("upgradeDomain").equals(upgradeDomain));
|
||||
}
|
||||
}
|
||||
assertEquals(fsn.getLiveNodes(), alivenodeinfo);
|
||||
// get attribute deadnodeinfo
|
||||
|
Loading…
Reference in New Issue
Block a user