diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java index d782b59ba9..64e078d296 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java @@ -23,10 +23,14 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; @@ -37,6 +41,8 @@ */ public class VersionEndpointTask implements Callable { + public static final Logger LOG = LoggerFactory.getLogger(VersionEndpointTask + .class); private final EndpointStateMachine rpcEndPoint; private final Configuration configuration; private final OzoneContainer ozoneContainer; @@ -71,21 +77,32 @@ public EndpointStateMachine.EndPointStates call() throws Exception { Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " + "null"); - Preconditions.checkNotNull(scmId, "Reply from SCM: clusterId cannot be" + - " null"); + Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " + + "cannot be null"); // If version file does not exist create version file and also set scmId for (Map.Entry entry : volumeMap.entrySet()) { HddsVolume hddsVolume = entry.getValue(); - hddsVolume.format(clusterId); - ozoneContainer.getDispatcher().setScmId(scmId); + boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId, + clusterId, LOG); + if (!result) { + volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath()); + } } + if (volumeSet.getVolumesList().size() == 0) { + // All volumes are inconsistent state + throw new DiskOutOfSpaceException("All configured Volumes are in " + + "Inconsistent State"); + } + ozoneContainer.getDispatcher().setScmId(scmId); EndpointStateMachine.EndPointStates nextState = rpcEndPoint.getState().getNextState(); rpcEndPoint.setState(nextState); rpcEndPoint.zeroMissedCount(); - } catch (IOException ex) { + } catch (DiskOutOfSpaceException ex) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); + } catch(IOException ex) { rpcEndPoint.logIfNeeded(ex); } finally { rpcEndPoint.unlock(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java index 5d6fc0aa3f..bc0bd056b1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java @@ -25,8 +25,10 @@ import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.util.Time; +import org.slf4j.Logger; import java.io.File; +import java.io.IOException; import java.util.Properties; import java.util.UUID; @@ -160,4 +162,58 @@ private static String getProperty(Properties props, String propName, File } return value; } + + /** + * Check Volume is consistent state or not. + * @param hddsVolume + * @param scmId + * @param clusterId + * @param logger + * @return true - if volume is in consistent state, otherwise false. + */ + public static boolean checkVolume(HddsVolume hddsVolume, String scmId, String + clusterId, Logger logger) { + File hddsRoot = hddsVolume.getHddsRootDir(); + String volumeRoot = hddsRoot.getPath(); + File scmDir = new File(hddsRoot, scmId); + + try { + hddsVolume.format(clusterId); + } catch (IOException ex) { + logger.error("Error during formatting volume {}, exception is {}", + volumeRoot, ex); + return false; + } + + File[] hddsFiles = hddsRoot.listFiles(); + + if(hddsFiles == null) { + // This is the case for IOException, where listFiles returns null. + // So, we fail the volume. + return false; + } else if (hddsFiles.length == 1) { + // DN started for first time or this is a newly added volume. + // So we create scm directory. + if (!scmDir.mkdir()) { + logger.error("Unable to create scmDir {}", scmDir); + return false; + } + return true; + } else if(hddsFiles.length == 2) { + // The files should be Version and SCM directory + if (scmDir.exists()) { + return true; + } else { + logger.error("Volume {} is in Inconsistent state, expected scm " + + "directory {} does not exist", volumeRoot, scmDir + .getAbsolutePath()); + return false; + } + } else { + // The hdds root dir should always have 2 files. One is Version file + // and other is SCM directory. + return false; + } + + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 986aa16634..c1595b241e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; @@ -64,14 +65,16 @@ public class ContainerReader implements Runnable { private final ContainerSet containerSet; private final OzoneConfiguration config; private final File hddsVolumeDir; + private final VolumeSet volumeSet; - ContainerReader(HddsVolume volume, ContainerSet cset, OzoneConfiguration - conf) { + ContainerReader(VolumeSet volSet, HddsVolume volume, ContainerSet cset, + OzoneConfiguration conf) { Preconditions.checkNotNull(volume); this.hddsVolume = volume; this.hddsVolumeDir = hddsVolume.getHddsRootDir(); this.containerSet = cset; this.config = conf; + this.volumeSet = volSet; } @Override @@ -97,10 +100,18 @@ public boolean accept(File pathname) { }); if (scmDir == null) { - LOG.error("Volume {} is empty with out metadata and chunks", + LOG.error("IO error for the volume {}, skipped loading", hddsVolumeRootDir); + volumeSet.failVolume(hddsVolumeRootDir.getPath()); return; } + + if (scmDir.length > 1) { + LOG.error("Volume {} is in Inconsistent state", hddsVolumeRootDir); + volumeSet.failVolume(hddsVolumeRootDir.getPath()); + return; + } + for (File scmLoc : scmDir) { File currentDir = null; currentDir = new File(scmLoc, Storage.STORAGE_DIR_CURRENT); @@ -123,9 +134,8 @@ public boolean accept(File pathname) { verifyContainerFile(containerName, containerFile, checksumFile); } else { - LOG.error( - "Missing container metadata files for Container: " + - "{}", containerName); + LOG.error("Missing container metadata files for " + + "Container: {}", containerName); } } else { LOG.error("Missing container metadata directory for " + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 8c3a0a271d..8f067d9f5c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -106,7 +106,7 @@ public void buildContainerSet() { while (volumeSetIterator.hasNext()) { HddsVolume volume = volumeSetIterator.next(); File hddsVolumeRootDir = volume.getHddsRootDir(); - Thread thread = new Thread(new ContainerReader(volume, + Thread thread = new Thread(new ContainerReader(volumeSet, volume, containerSet, config)); thread.start(); volumeThreads.add(thread); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index fb8e7c1d05..8827d1df1d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -56,6 +56,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { private AtomicInteger heartbeatCount = new AtomicInteger(0); private AtomicInteger rpcCount = new AtomicInteger(0); private AtomicInteger containerReportsCount = new AtomicInteger(0); + private String clusterId; + private String scmId; + + public ScmTestMock() { + clusterId = UUID.randomUUID().toString(); + scmId = UUID.randomUUID().toString(); + } // Map of datanode to containers private Map> nodeContainers = @@ -157,8 +164,8 @@ public long getBytesUsed() { return VersionResponse.newBuilder() .setVersion(versionInfo.getVersion()) .addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription()) - .addValue(OzoneConsts.SCM_ID, UUID.randomUUID().toString()) - .addValue(OzoneConsts.CLUSTER_ID, UUID.randomUUID().toString()) + .addValue(OzoneConsts.SCM_ID, scmId) + .addValue(OzoneConsts.CLUSTER_ID, clusterId) .build().getProtobufMessage(); } @@ -329,4 +336,20 @@ public void clearScmCommandRequests() { public void addScmCommandRequest(SCMCommandProto scmCmd) { scmCommandRequests.add(scmCmd); } + + /** + * Set scmId. + * @param id + */ + public void setScmId(String id) { + this.scmId = id; + } + + /** + * Set scmId. + * @return scmId + */ + public String getScmId() { + return scmId; + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java index ece75459b9..59029db598 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -19,6 +19,7 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ipc.RPC; @@ -57,9 +58,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.junit.Assert.assertTrue; /** @@ -68,7 +69,9 @@ public class TestDatanodeStateMachine { private static final Logger LOG = LoggerFactory.getLogger(TestDatanodeStateMachine.class); - private final int scmServerCount = 3; + // Changing it to 1, as current code checks for multiple scm directories, + // and fail if exists + private final int scmServerCount = 1; private List serverAddresses; private List scmServers; private List mockServers; @@ -90,7 +93,6 @@ public void setUp() throws Exception { String address = "127.0.0.1"; serverAddresses.add(address + ":" + port); ScmTestMock mock = new ScmTestMock(); - scmServers.add(SCMTestUtils.startScmRpcServer(conf, mock, new InetSocketAddress(address, port), 10)); mockServers.add(mock); @@ -107,7 +109,7 @@ public void setUp() throws Exception { } File dataDir = new File(testRoot, "data"); - conf.set(DFS_DATANODE_DATA_DIR_KEY, dataDir.getAbsolutePath()); + conf.set(HDDS_DATANODE_DIR_KEY, dataDir.getAbsolutePath()); if (!dataDir.mkdirs()) { LOG.info("Data dir create failed."); } @@ -145,7 +147,7 @@ public void tearDown() throws Exception { } catch (Exception e) { //ignore all execption from the shutdown } finally { - testRoot.delete(); + FileUtil.fullyDelete(testRoot); } } @@ -162,7 +164,7 @@ public void testStartStopDatanodeStateMachine() throws IOException, stateMachine.startDaemon(); SCMConnectionManager connectionManager = stateMachine.getConnectionManager(); - GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3, + GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 1, 1000, 30000); stateMachine.stopDaemon(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index be8bd8767f..6619d26826 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -70,8 +70,10 @@ .RegisterEndpointTask; import org.apache.hadoop.ozone.container.common.states.endpoint .VersionEndpointTask; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.util.Time; import org.junit.AfterClass; @@ -174,6 +176,53 @@ public void testGetVersionTask() throws Exception { } } + @Test + public void testCheckVersionResponse() throws Exception { + OzoneConfiguration conf = SCMTestUtils.getConf(); + try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, + serverAddress, 1000)) { + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(VersionEndpointTask.LOG); + OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(), + conf); + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf, ozoneContainer); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + + // if version call worked the endpoint should automatically move to the + // next state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + newState); + + // Now rpcEndpoint should remember the version it got from SCM + Assert.assertNotNull(rpcEndPoint.getVersion()); + + // Now change server scmId, so datanode scmId will be + // different from SCM server response scmId + String newScmId = UUID.randomUUID().toString(); + scmServerImpl.setScmId(newScmId); + newState = versionTask.call(); + Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN, + newState); + List volumesList = ozoneContainer.getVolumeSet() + .getFailedVolumesList(); + Assert.assertTrue(volumesList.size() == 1); + File expectedScmDir = new File(volumesList.get(0).getHddsRootDir(), + scmServerImpl.getScmId()); + Assert.assertTrue(logCapturer.getOutput().contains("expected scm " + + "directory " + expectedScmDir.getAbsolutePath() + " does not " + + "exist")); + Assert.assertTrue(ozoneContainer.getVolumeSet().getVolumesList().size() + == 0); + Assert.assertTrue(ozoneContainer.getVolumeSet().getFailedVolumesList() + .size() == 1); + + } + } + + + @Test /** * This test makes a call to end point where there is no SCM server. We