diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index df2e167cf3..b41da8302a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -56,6 +56,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getX509Certificate;
import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
@@ -84,6 +85,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
private HddsDatanodeHttpServer httpServer;
private boolean printBanner;
private String[] args;
+ private volatile AtomicBoolean isStopped = new AtomicBoolean(false);
public HddsDatanodeService(boolean printBanner, String[] args) {
this.printBanner = printBanner;
@@ -209,7 +211,7 @@ public void start() {
initializeCertificateClient(conf);
}
datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
- dnCertClient);
+ dnCertClient, this::terminateDatanode);
try {
httpServer = new HddsDatanodeHttpServer(conf);
httpServer.start();
@@ -421,29 +423,37 @@ public void join() {
}
}
+ public void terminateDatanode() {
+ stop();
+ terminate(1);
+ }
+
+
@Override
public void stop() {
- if (plugins != null) {
- for (ServicePlugin plugin : plugins) {
+ if (!isStopped.get()) {
+ isStopped.set(true);
+ if (plugins != null) {
+ for (ServicePlugin plugin : plugins) {
+ try {
+ plugin.stop();
+ LOG.info("Stopped plug-in {}", plugin);
+ } catch (Throwable t) {
+ LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
+ }
+ }
+ }
+ if (datanodeStateMachine != null) {
+ datanodeStateMachine.stopDaemon();
+ }
+ if (httpServer != null) {
try {
- plugin.stop();
- LOG.info("Stopped plug-in {}", plugin);
- } catch (Throwable t) {
- LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
+ httpServer.stop();
+ } catch (Exception e) {
+ LOG.error("Stopping HttpServer is failed.", e);
}
}
}
- if (datanodeStateMachine != null) {
- datanodeStateMachine.stopDaemon();
- }
- if (httpServer != null) {
- try {
- httpServer.stop();
- } catch (Exception e) {
- LOG.error("Stopping HttpServer is failed.", e);
- }
- }
-
}
@Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeStopService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeStopService.java
new file mode 100644
index 0000000000..02c1431fb3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeStopService.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+/**
+ * Interface which declares a method to stop HddsDatanodeService.
+ */
+public interface HddsDatanodeStopService {
+
+ void stopService();
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 69782efbfd..0119d23fe6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerCommandHandler;
@@ -84,6 +85,7 @@ public class DatanodeStateMachine implements Closeable {
private JvmPauseMonitor jvmPauseMonitor;
private CertificateClient dnCertClient;
+ private final HddsDatanodeStopService hddsDatanodeStopService;
/**
* Constructs a a datanode state machine.
@@ -93,7 +95,9 @@ public class DatanodeStateMachine implements Closeable {
* enabled
*/
public DatanodeStateMachine(DatanodeDetails datanodeDetails,
- Configuration conf, CertificateClient certClient) throws IOException {
+ Configuration conf, CertificateClient certClient,
+ HddsDatanodeStopService hddsDatanodeStopService) throws IOException {
+ this.hddsDatanodeStopService = hddsDatanodeStopService;
this.conf = conf;
this.datanodeDetails = datanodeDetails;
executorService = HadoopExecutors.newCachedThreadPool(
@@ -195,6 +199,14 @@ private void start() throws IOException {
LOG.error("Unable to finish the execution.", e);
}
}
+
+ // If we have got some exception in stateMachine we set the state to
+ // shutdown to stop the stateMachine thread. Along with this we should
+ // also stop the datanode.
+ if (context.getShutdownOnError()) {
+ LOG.error("DatanodeStateMachine Shutdown due to an critical error");
+ hddsDatanodeStopService.stopService();
+ }
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 56151f87d1..2c01f3a73d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -73,6 +73,7 @@ public class StateContext {
private final Queue containerActions;
private final Queue pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
+ private boolean shutdownOnError = false;
/**
* Starting with a 2 sec heartbeat frequency which will be updated to the
@@ -152,6 +153,22 @@ public void setState(DatanodeStateMachine.DatanodeStates state) {
this.state = state;
}
+ /**
+ * Sets the shutdownOnError. This method needs to be called when we
+ * set DatanodeState to SHUTDOWN when executing a task of a DatanodeState.
+ * @param value
+ */
+ private void setShutdownOnError(boolean value) {
+ this.shutdownOnError = value;
+ }
+
+ /**
+ * Get shutdownStateMachine.
+ * @return boolean
+ */
+ public boolean getShutdownOnError() {
+ return shutdownOnError;
+ }
/**
* Adds the report to report queue.
*
@@ -367,6 +384,14 @@ public void execute(ExecutorService service, long time, TimeUnit unit)
}
this.setState(newState);
}
+
+ if (this.state == DatanodeStateMachine.DatanodeStates.SHUTDOWN) {
+ LOG.error("Critical error occurred in StateMachine, setting " +
+ "shutDownMachine");
+ // When some exception occurred, set shutdownStateMachine to true, so
+ // that we can terminate the datanode.
+ setShutdownOnError(true);
+ }
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
index 6fba4fb136..875e96a0a9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
@@ -167,8 +167,6 @@ private void initializeVolumeSet() throws IOException {
checkAndSetClusterID(hddsVolume.getClusterID());
- volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
- volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
LOG.info("Added Volume : {} to VolumeSet",
hddsVolume.getHddsRootDir().getPath());
@@ -177,6 +175,8 @@ private void initializeVolumeSet() throws IOException {
throw new IOException("Failed to create HDDS storage dir " +
hddsVolume.getHddsRootDir());
}
+ volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
+ volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
} catch (IOException e) {
HddsVolume volume = new HddsVolume.Builder(locationString)
.failedVolume(true).build();
@@ -185,12 +185,14 @@ private void initializeVolumeSet() throws IOException {
}
}
- checkAllVolumes();
-
+ // First checking if we have any volumes, if all volumes are failed the
+ // volumeMap size will be zero, and we throw Exception.
if (volumeMap.size() == 0) {
throw new DiskOutOfSpaceException("No storage locations configured");
}
+ checkAllVolumes();
+
// Ensure volume threads are stopped and scm df is saved during shutdown.
shutdownHook = () -> {
saveVolumeSetUsed();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index e9c5e3ec95..9840f01bb3 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -160,7 +160,7 @@ public void tearDown() throws Exception {
public void testStartStopDatanodeStateMachine() throws IOException,
InterruptedException, TimeoutException {
try (DatanodeStateMachine stateMachine =
- new DatanodeStateMachine(getNewDatanodeDetails(), conf, null)) {
+ new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null)) {
stateMachine.startDaemon();
SCMConnectionManager connectionManager =
stateMachine.getConnectionManager();
@@ -222,7 +222,7 @@ public void testDatanodeStateContext() throws IOException,
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
try (DatanodeStateMachine stateMachine =
- new DatanodeStateMachine(datanodeDetails, conf, null)) {
+ new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -343,7 +343,7 @@ public void testDatanodeStateMachineWithIdWriteFail() throws Exception {
datanodeDetails.setPort(port);
try (DatanodeStateMachine stateMachine =
- new DatanodeStateMachine(datanodeDetails, conf, null)) {
+ new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -406,7 +406,7 @@ public void testDatanodeStateMachineWithInvalidConfiguration()
perTestConf.setStrings(entry.getKey(), entry.getValue());
LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
- getNewDatanodeDetails(), perTestConf, null)) {
+ getNewDatanodeDetails(), perTestConf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
index 5cb218c5ed..472bb9891c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -38,6 +38,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.CoreMatchers.is;
import org.junit.After;
+
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.junit.Rule;
@@ -125,14 +127,14 @@ HddsVolumeChecker getVolumeChecker(Configuration configuration)
}
/**
- * Verify that initialization fails if all volumes are bad.
+ * Verify that all volumes are added to fail list if all volumes are bad.
*/
@Test
public void testAllVolumesAreBad() throws IOException {
final int numVolumes = 5;
conf = getConfWithDataNodeDirs(numVolumes);
- thrown.expect(IOException.class);
+
final VolumeSet volumeSet = new VolumeSet(
UUID.randomUUID().toString(), conf) {
@Override
@@ -141,6 +143,9 @@ HddsVolumeChecker getVolumeChecker(Configuration configuration)
return new DummyChecker(configuration, new Timer(), numVolumes);
}
};
+
+ assertEquals(volumeSet.getFailedVolumesList().size(), numVolumes);
+ assertEquals(volumeSet.getVolumesList().size(), 0);
}
/**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 4b03474f38..6b493edb68 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -175,6 +175,10 @@ public void testGetVersionTask() throws Exception {
@Test
public void testCheckVersionResponse() throws Exception {
OzoneConfiguration conf = SCMTestUtils.getConf();
+ conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+ true);
+ conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
+ true);
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, 1000)) {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
@@ -478,7 +482,7 @@ private StateContext heartbeatTaskHelper(InetSocketAddress scmAddress,
// Create a datanode state machine for stateConext used by endpoint task
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
- TestUtils.randomDatanodeDetails(), conf, null);
+ TestUtils.randomDatanodeDetails(), conf, null, null);
EndpointStateMachine rpcEndPoint =
createEndpoint(conf, scmAddress, rpcTimeout)) {
HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index cd2c38106f..a07457ab4c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -175,11 +175,11 @@ public void testContainerRandomPort() throws IOException {
true);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
- TestUtils.randomDatanodeDetails(), ozoneConf, null);
+ TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
- TestUtils.randomDatanodeDetails(), ozoneConf, null);
+ TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
- TestUtils.randomDatanodeDetails(), ozoneConf, null)
+ TestUtils.randomDatanodeDetails(), ozoneConf, null, null)
) {
HashSet ports = new HashSet();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
@@ -198,11 +198,11 @@ public void testContainerRandomPort() throws IOException {
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
- TestUtils.randomDatanodeDetails(), ozoneConf, null);
+ TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
- TestUtils.randomDatanodeDetails(), ozoneConf, null);
+ TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
- TestUtils.randomDatanodeDetails(), ozoneConf, null)
+ TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
) {
HashSet ports = new HashSet();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));