From 8c4680852b20ad0e65e77dd123c9ba5bb6f2fa39 Mon Sep 17 00:00:00 2001 From: Mingliang Liu Date: Mon, 5 Dec 2016 11:34:13 -0800 Subject: [PATCH] HDFS-11094. Send back HAState along with NamespaceInfo during a versionRequest as an optional parameter. Contributed by Eric Badger --- .../hadoop/hdfs/protocolPB/PBHelper.java | 80 ++++++++++++------- .../hdfs/server/datanode/BPOfferService.java | 10 ++- .../hdfs/server/datanode/BPServiceActor.java | 4 +- .../hdfs/server/namenode/FSNamesystem.java | 8 +- .../hdfs/server/protocol/NamespaceInfo.java | 26 ++++++ .../src/main/proto/HdfsServer.proto | 2 + .../server/datanode/TestBPOfferService.java | 31 +++++++ .../server/namenode/TestFSNamesystem.java | 21 +++++ 8 files changed, 150 insertions(+), 32 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 78371f5b0f..1e6d88249e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; -import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -338,7 +338,8 @@ public static NamespaceInfo convert(NamespaceInfoProto info) { StorageInfoProto storage = info.getStorageInfo(); return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(), info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(), - info.getSoftwareVersion(), info.getCapabilities()); + info.getSoftwareVersion(), info.getCapabilities(), + convert(info.getState())); } public static NamenodeCommand convert(NamenodeCommandProto cmd) { @@ -744,43 +745,68 @@ public static ReceivedDeletedBlockInfo convert( } public static NamespaceInfoProto convert(NamespaceInfo info) { - return NamespaceInfoProto.newBuilder() - .setBlockPoolID(info.getBlockPoolID()) + NamespaceInfoProto.Builder builder = NamespaceInfoProto.newBuilder(); + builder.setBlockPoolID(info.getBlockPoolID()) .setBuildVersion(info.getBuildVersion()) .setUnused(0) .setStorageInfo(PBHelper.convert((StorageInfo)info)) .setSoftwareVersion(info.getSoftwareVersion()) - .setCapabilities(info.getCapabilities()) - .build(); + .setCapabilities(info.getCapabilities()); + HAServiceState state = info.getState(); + if(state != null) { + builder.setState(convert(info.getState())); + } + return builder.build(); + } + + public static HAServiceState convert(HAServiceStateProto s) { + if (s == null) { + return null; + } + switch (s) { + case INITIALIZING: + return HAServiceState.INITIALIZING; + case ACTIVE: + return HAServiceState.ACTIVE; + case STANDBY: + return HAServiceState.STANDBY; + default: + throw new IllegalArgumentException("Unexpected HAServiceStateProto:" + + s); + } + } + + public static HAServiceStateProto convert(HAServiceState s) { + if (s == null) { + return null; + } + switch (s) { + case INITIALIZING: + return HAServiceStateProto.INITIALIZING; + case ACTIVE: + return HAServiceStateProto.ACTIVE; + case STANDBY: + return HAServiceStateProto.STANDBY; + default: + throw new IllegalArgumentException("Unexpected HAServiceState:" + + s); + } } public static NNHAStatusHeartbeat convert(NNHAStatusHeartbeatProto s) { - if (s == null) return null; - switch (s.getState()) { - case ACTIVE: - return new NNHAStatusHeartbeat(HAServiceState.ACTIVE, s.getTxid()); - case STANDBY: - return new NNHAStatusHeartbeat(HAServiceState.STANDBY, s.getTxid()); - default: - throw new IllegalArgumentException("Unexpected NNHAStatusHeartbeat.State:" + s.getState()); + if (s == null) { + return null; } + return new NNHAStatusHeartbeat(convert(s.getState()), s.getTxid()); } public static NNHAStatusHeartbeatProto convert(NNHAStatusHeartbeat hb) { - if (hb == null) return null; - NNHAStatusHeartbeatProto.Builder builder = - NNHAStatusHeartbeatProto.newBuilder(); - switch (hb.getState()) { - case ACTIVE: - builder.setState(HAServiceProtocolProtos.HAServiceStateProto.ACTIVE); - break; - case STANDBY: - builder.setState(HAServiceProtocolProtos.HAServiceStateProto.STANDBY); - break; - default: - throw new IllegalArgumentException("Unexpected NNHAStatusHeartbeat.State:" + - hb.getState()); + if (hb == null) { + return null; } + NNHAStatusHeartbeatProto.Builder builder = + NNHAStatusHeartbeatProto.newBuilder(); + builder.setState(convert(hb.getState())); builder.setTxid(hb.getTxId()); return builder.build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 00102eb56d..00e6b3ecb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -307,8 +307,16 @@ DataNode getDataNode() { * verifies that this namespace matches (eg to prevent a misconfiguration * where a StandbyNode from a different cluster is specified) */ - void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { + void verifyAndSetNamespaceInfo(BPServiceActor actor, NamespaceInfo nsInfo) + throws IOException { writeLock(); + + if(nsInfo.getState() == HAServiceState.ACTIVE + && bpServiceToActive == null) { + LOG.info("Acknowledging ACTIVE Namenode during handshake" + actor); + bpServiceToActive = actor; + } + try { if (this.bpNSInfo == null) { this.bpNSInfo = nsInfo; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f3247fca27..dffe14f457 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -269,11 +269,11 @@ private void connectToNNAndHandshake() throws IOException { // First phase of the handshake with NN - get the namespace // info. NamespaceInfo nsInfo = retrieveNamespaceInfo(); - + // Verify that this matches the other NN in this HA pair. // This also initializes our block pool in the DN if we are // the first NN connection for this BP. - bpos.verifyAndSetNamespaceInfo(nsInfo); + bpos.verifyAndSetNamespaceInfo(this, nsInfo); // Second phase of the handshake with the NN. register(nsInfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8a750a06ce..90fb924ee9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1594,7 +1594,7 @@ long getCTime() { NamespaceInfo unprotectedGetNamespaceInfo() { return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(), getClusterId(), getBlockPoolId(), - getFSImage().getStorage().getCTime()); + getFSImage().getStorage().getCTime(), getState()); } /** @@ -4531,12 +4531,16 @@ public long getMillisSinceLastLoadedEdits() { return 0; } } - + @Metric public int getBlockCapacity() { return blockManager.getCapacity(); } + public HAServiceState getState() { + return haContext == null ? null : haContext.getState().getServiceState(); + } + @Override // FSNamesystemMBean public String getFSState() { return isInSafeMode() ? "safeMode" : "Operational"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java index 90d0aacc4f..66ce9ee52e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; @@ -44,6 +45,7 @@ public class NamespaceInfo extends StorageInfo { String blockPoolID = ""; // id of the block pool String softwareVersion; long capabilities; + HAServiceState state; // only authoritative on the server-side to determine advertisement to // clients. enum will update the supported values @@ -88,6 +90,14 @@ public NamespaceInfo(int nsID, String clusterID, String bpID, CAPABILITIES_SUPPORTED); } + public NamespaceInfo(int nsID, String clusterID, String bpID, + long cT, String buildVersion, String softwareVersion, + long capabilities, HAServiceState st) { + this(nsID, clusterID, bpID, cT, buildVersion, softwareVersion, + capabilities); + this.state = st; + } + // for use by server and/or client public NamespaceInfo(int nsID, String clusterID, String bpID, long cT, String buildVersion, String softwareVersion, @@ -105,6 +115,13 @@ public NamespaceInfo(int nsID, String clusterID, String bpID, this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(), VersionInfo.getVersion()); } + + public NamespaceInfo(int nsID, String clusterID, String bpID, + long cT, HAServiceState st) { + this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(), + VersionInfo.getVersion()); + this.state = st; + } public long getCapabilities() { return capabilities; @@ -115,6 +132,11 @@ public void setCapabilities(long capabilities) { this.capabilities = capabilities; } + @VisibleForTesting + public void setState(HAServiceState state) { + this.state = state; + } + public boolean isCapabilitySupported(Capability capability) { Preconditions.checkArgument(capability != Capability.UNKNOWN, "cannot test for unknown capability"); @@ -134,6 +156,10 @@ public String getSoftwareVersion() { return softwareVersion; } + public HAServiceState getState() { + return state; + } + @Override public String toString(){ return super.toString() + ";bpid=" + blockPoolID; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto index 910e03bef5..d7deebf167 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto @@ -32,6 +32,7 @@ option java_generate_equals_and_hash = true; package hadoop.hdfs; import "hdfs.proto"; +import "HAServiceProtocol.proto"; /** * Block access token information @@ -101,6 +102,7 @@ message NamespaceInfoProto { required StorageInfoProto storageInfo = 4;// Node information required string softwareVersion = 5; // Software version number (e.g. 2.0.0) optional uint64 capabilities = 6 [default = 0]; // feature flags + optional hadoop.common.HAServiceStateProto state = 7; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 2d50c75763..f8f0a3c460 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -799,4 +800,34 @@ private int getStandbyIBRSize(BPOfferService bpos) { } return -1; } + + /* + * + */ + @Test + public void testNNHAStateUpdateFromVersionRequest() throws Exception { + final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + BPServiceActor actor = bpos.getBPServiceActors().get(0); + bpos.start(); + waitForInitialization(bpos); + // Should start with neither NN as active. + assertNull(bpos.getActiveNN()); + + // getNamespaceInfo() will not include HAServiceState + NamespaceInfo nsInfo = mockNN1.versionRequest(); + bpos.verifyAndSetNamespaceInfo(actor, nsInfo); + + assertNull(bpos.getActiveNN()); + + // Change mock so getNamespaceInfo() will include HAServiceState + Mockito.doReturn(new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0, + HAServiceState.ACTIVE)).when(mockNN1).versionRequest(); + + // Update the bpos NamespaceInfo + nsInfo = mockNN1.versionRequest(); + bpos.verifyAndSetNamespaceInfo(actor, nsInfo); + + assertNotNull(bpos.getActiveNN()); + + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java index f02c679f38..6a0dd6fd9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAState; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.junit.After; import org.junit.Test; import org.mockito.Mockito; @@ -154,6 +156,25 @@ public void testReplQueuesActiveAfterStartupSafemode() throws IOException, Inter + "safemode 2nd time", bm.isPopulatingReplQueues()); } + @Test + public void testHAStateInNamespaceInfo() throws IOException { + Configuration conf = new Configuration(); + + FSEditLog fsEditLog = Mockito.mock(FSEditLog.class); + FSImage fsImage = Mockito.mock(FSImage.class); + Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); + NNStorage nnStorage = Mockito.mock(NNStorage.class); + Mockito.when(fsImage.getStorage()).thenReturn(nnStorage); + + FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage); + FSNamesystem fsn = Mockito.spy(fsNamesystem); + Mockito.when(fsn.getState()).thenReturn( + HAServiceProtocol.HAServiceState.ACTIVE); + + NamespaceInfo nsInfo = fsn.unprotectedGetNamespaceInfo(); + assertNotNull(nsInfo.getState()); + } + @Test public void testReset() throws Exception { Configuration conf = new Configuration();