diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java index 79fa1746d1..2d00da86c9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java @@ -64,50 +64,57 @@ public VersionEndpointTask(EndpointStateMachine rpcEndPoint, public EndpointStateMachine.EndPointStates call() throws Exception { rpcEndPoint.lock(); try{ - SCMVersionResponseProto versionResponse = - rpcEndPoint.getEndPoint().getVersion(null); - VersionResponse response = VersionResponse.getFromProtobuf( - versionResponse); - rpcEndPoint.setVersion(response); + if (rpcEndPoint.getState().equals( + EndpointStateMachine.EndPointStates.GETVERSION)) { + SCMVersionResponseProto versionResponse = + rpcEndPoint.getEndPoint().getVersion(null); + VersionResponse response = VersionResponse.getFromProtobuf( + versionResponse); + rpcEndPoint.setVersion(response); - String scmId = response.getValue(OzoneConsts.SCM_ID); - String clusterId = response.getValue(OzoneConsts.CLUSTER_ID); + String scmId = response.getValue(OzoneConsts.SCM_ID); + String clusterId = response.getValue(OzoneConsts.CLUSTER_ID); - // Check volumes - VolumeSet volumeSet = ozoneContainer.getVolumeSet(); - volumeSet.writeLock(); - try { - Map volumeMap = volumeSet.getVolumeMap(); + // Check volumes + VolumeSet volumeSet = ozoneContainer.getVolumeSet(); + volumeSet.writeLock(); + try { + Map volumeMap = volumeSet.getVolumeMap(); - Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " + - "null"); - Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " + - "cannot be null"); + Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " + + "null"); + Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " + + "cannot be null"); - // If version file does not exist create version file and also set scmId - for (Map.Entry entry : volumeMap.entrySet()) { - HddsVolume hddsVolume = entry.getValue(); - boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId, - clusterId, LOG); - if (!result) { - volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath()); + // If version file does not exist create version file and also set scmId + + for (Map.Entry entry : volumeMap.entrySet()) { + HddsVolume hddsVolume = entry.getValue(); + boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId, + clusterId, LOG); + if (!result) { + volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath()); + } } + if (volumeSet.getVolumesList().size() == 0) { + // All volumes are in inconsistent state + throw new DiskOutOfSpaceException("All configured Volumes are in " + + "Inconsistent State"); + } + } finally { + volumeSet.writeUnlock(); } - if (volumeSet.getVolumesList().size() == 0) { - // All volumes are in inconsistent state - throw new DiskOutOfSpaceException("All configured Volumes are in " + - "Inconsistent State"); - } - } finally { - volumeSet.writeUnlock(); + + ozoneContainer.getDispatcher().setScmId(scmId); + + EndpointStateMachine.EndPointStates nextState = + rpcEndPoint.getState().getNextState(); + rpcEndPoint.setState(nextState); + rpcEndPoint.zeroMissedCount(); + } else { + LOG.debug("Cannot execute GetVersion task as endpoint state machine " + + "is in {} state", rpcEndPoint.getState()); } - - ozoneContainer.getDispatcher().setScmId(scmId); - - EndpointStateMachine.EndPointStates nextState = - rpcEndPoint.getState().getNextState(); - rpcEndPoint.setState(nextState); - rpcEndPoint.zeroMissedCount(); } catch (DiskOutOfSpaceException ex) { rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); } catch(IOException ex) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index 0ca47493b4..3083660676 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -23,9 +23,13 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; @@ -214,4 +218,50 @@ private void createMalformedIDFile(File malformedFile) out.write("malformed".getBytes()); out.close(); } + + /** + * Test that a DN can register with SCM even if it was started before the SCM. + * @throws Exception + */ + @Test (timeout = 300_000) + public void testDNstartAfterSCM() throws Exception { + // Start a cluster with 1 DN + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(1) + .build(); + cluster.waitForClusterToBeReady(); + + // Stop the SCM + StorageContainerManager scm = cluster.getStorageContainerManager(); + scm.stop(); + + // Restart DN + cluster.restartHddsDatanode(0, false); + + // DN should be in GETVERSION state till the SCM is restarted. + // Check DN endpoint state for 20 seconds + DatanodeStateMachine dnStateMachine = cluster.getHddsDatanodes().get(0) + .getDatanodeStateMachine(); + for (int i = 0; i < 20; i++) { + for (EndpointStateMachine endpoint : + dnStateMachine.getConnectionManager().getValues()) { + Assert.assertEquals( + EndpointStateMachine.EndPointStates.GETVERSION, + endpoint.getState()); + } + Thread.sleep(1000); + } + + // DN should successfully register with the SCM after SCM is restarted. + // Restart the SCM + cluster.restartStorageContainerManager(); + // Wait for DN to register + cluster.waitForClusterToBeReady(); + // DN should be in HEARTBEAT state after registering with the SCM + for (EndpointStateMachine endpoint : + dnStateMachine.getConnectionManager().getValues()) { + Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, + endpoint.getState()); + } + } }