diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 1fcaaf5182..10fec6059c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -154,6 +154,8 @@ ContainerReplicaProto getContainerReport()
/**
* check and report the structural integrity of the container.
+ * @return true if the integrity checks pass
+ * false otherwise
*/
- void check() throws StorageContainerException;
+ boolean check();
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 513043f6fd..c50f4573d0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -135,8 +135,12 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
addReports(requestBuilder);
addContainerActions(requestBuilder);
addPipelineActions(requestBuilder);
+ SCMHeartbeatRequestProto request = requestBuilder.build();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending heartbeat message :: {}", request.toString());
+ }
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
- .sendHeartbeat(requestBuilder.build());
+ .sendHeartbeat(request);
processResponse(reponse, datanodeDetailsProto);
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 6a1ca86821..a818b511ba 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -648,7 +648,7 @@ public File getContainerDBFile() {
/**
* run integrity checks on the Container metadata.
*/
- public void check() throws StorageContainerException {
+ public boolean check() {
ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK;
long containerId = containerData.getContainerID();
@@ -671,14 +671,12 @@ public void check() throws StorageContainerException {
containerData.getState());
break;
default:
- throw new StorageContainerException(
- "Invalid Container state found for Container : " + containerData
- .getContainerID(), INVALID_CONTAINER_STATE);
+ break;
}
if (level == ContainerCheckLevel.NO_CHECK) {
LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
- return;
+ return true;
}
KeyValueContainerCheck checker =
@@ -687,17 +685,11 @@ public void check() throws StorageContainerException {
switch (level) {
case FAST_CHECK:
- checker.fastCheck();
- break;
+ return checker.fastCheck();
case FULL_CHECK:
- checker.fullCheck();
- break;
- case NO_CHECK:
- LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
- break;
+ return checker.fullCheck();
default:
- // we should not be here at all, scuttle the ship!
- Preconditions.checkNotNull(0, "Invalid Containercheck level");
+ return true;
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index ebbd4e01d0..3e252bf0b3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -72,37 +72,22 @@ public KeyValueContainerCheck(String metadataPath, Configuration conf,
* These checks do not look inside the metadata files.
* Applicable for OPEN containers.
*
- * @return true : corruption detected, false : no corruption.
+ * @return true : integrity checks pass, false : otherwise.
*/
public boolean fastCheck() {
- boolean corruption = false;
+ LOG.info("Running basic checks for container {};", containerID);
+ boolean valid = false;
try {
- basicChecks();
+ loadContainerData();
+ checkLayout();
+ checkContainerFile();
+ valid = true;
} catch (IOException e) {
handleCorruption(e);
- corruption = true;
}
- return corruption;
- }
-
- /**
- * Checks :
- * 1. check directory layout
- * 2. check container file
- *
- * @return void
- */
-
- private void basicChecks() throws IOException {
-
- LOG.trace("Running basic checks for container {};", containerID);
-
- loadContainerData();
-
- checkLayout();
- checkContainerFile();
+ return valid;
}
/**
@@ -114,21 +99,22 @@ private void basicChecks() throws IOException {
*
* fullCheck is a superset of fastCheck
*
- * @return true : corruption detected, false : no corruption.
+ * @return true : integrity checks pass, false : otherwise.
*/
public boolean fullCheck() {
- boolean corruption = false;
+ boolean valid = false;
try {
- basicChecks();
- checkBlockDB();
-
+ valid = fastCheck();
+ if (valid) {
+ checkBlockDB();
+ }
} catch (IOException e) {
handleCorruption(e);
- corruption = true;
+ valid = false;
}
- return corruption;
+ return valid;
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 6755ba2824..10f94dc6ea 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -900,8 +900,17 @@ public void markContainerForClose(Container container)
public void markContainerUnhealthy(Container container)
throws IOException {
if (container.getContainerState() != State.UNHEALTHY) {
- container.markContainerUnhealthy();
- sendICR(container);
+ try {
+ container.markContainerUnhealthy();
+ } catch (IOException ex) {
+ // explicitly catch IOException here since the this operation
+ // will fail if the Rocksdb metadata is corrupted.
+ long id = container.getContainerData().getContainerID();
+ LOG.warn("Unexpected error while marking container "
+ +id+ " as unhealthy", ex);
+ } finally {
+ sendICR(container);
+ }
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index f84a6a945f..f643c1f11b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -73,6 +73,18 @@ public void markContainerForClose(final long containerId)
}
}
+ /**
+ * Marks the container as UNHEALTHY.
+ *
+ * @param containerId Id of the container to update
+ * @throws IOException in case of exception
+ */
+ public void markContainerUnhealthy(final long containerId)
+ throws IOException {
+ Container container = containerSet.getContainer(containerId);
+ getHandler(container).markContainerUnhealthy(container);
+ }
+
/**
* Returns the container report.
*
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java
index 380dc9e4eb..ac473a419c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Iterator;
/**
@@ -56,11 +57,7 @@ public ContainerScrubber(OzoneConfiguration conf,
LOG.info("Background ContainerScrubber starting up");
while (true) {
- try {
- scrub();
- } catch (StorageContainerException e) {
- LOG.error("Scrubber encountered StorageContainerException.");
- }
+ scrub();
if (this.halt) {
break; // stop and exit if requested
@@ -129,22 +126,20 @@ private void throttleScrubber(TimeStamp startTime) {
}
}
- private void scrub() throws StorageContainerException {
+ private void scrub() {
Iterator containerIt = controller.getContainers();
long count = 0;
while (containerIt.hasNext() && !halt) {
TimeStamp startTime = new TimeStamp(System.currentTimeMillis());
Container container = containerIt.next();
-
try {
- container.check();
- } catch (StorageContainerException e) {
- LOG.error("Error unexpected exception {} for Container {}", e,
- container.getContainerData().getContainerID());
- container.markContainerUnhealthy();
- // XXX Action required here
+ scrub(container);
+ } catch (IOException e) {
+ LOG.info("Unexpected error while scrubbing container {}",
+ container.getContainerData().getContainerID());
}
+
count++;
throttleScrubber(startTime);
@@ -152,4 +147,12 @@ private void scrub() throws StorageContainerException {
LOG.debug("iterator ran integrity checks on {} containers", count);
}
+
+ @VisibleForTesting
+ public void scrub(Container container) throws IOException {
+ if (!container.check()) {
+ controller.markContainerUnhealthy(
+ container.getContainerData().getContainerID());
+ }
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index 4ef77e4565..5dccca6e37 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -55,7 +55,6 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@@ -101,7 +100,7 @@ public TestKeyValueContainerCheck(String metadataImpl) {
int deletedBlocks = 1;
int normalBlocks = 3;
int chunksPerBlock = 4;
- boolean corruption = false;
+ boolean valid = false;
// test Closed Container
createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536,
@@ -115,14 +114,14 @@ public TestKeyValueContainerCheck(String metadataImpl) {
containerID);
// first run checks on a Open Container
- corruption = kvCheck.fastCheck();
- assertFalse(corruption);
+ valid = kvCheck.fastCheck();
+ assertTrue(valid);
container.close();
// next run checks on a Closed Container
- corruption = kvCheck.fullCheck();
- assertFalse(corruption);
+ valid = kvCheck.fullCheck();
+ assertTrue(valid);
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
index 042fd56b6a..a7efb55754 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
@@ -47,6 +47,8 @@ public IncrementalContainerReportHandler(
@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
+ LOG.debug("Processing incremental container report from data node {}",
+ report.getDatanodeDetails().getUuid());
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
new file mode 100644
index 0000000000..abed3ebe48
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.dn.scrubber;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubber;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.UUID;
+import java.io.File;
+
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+
+/**
+ * This class tests the data scrubber functionality.
+ */
+public class TestDataScrubber {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration ozoneConfig;
+ private static OzoneClient ozClient = null;
+ private static ObjectStore store = null;
+ private static OzoneManager ozoneManager;
+ private static StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ ozoneConfig = new OzoneConfiguration();
+ ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
+ ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
+ .build();
+ cluster.waitForClusterToBeReady();
+ ozClient = OzoneClientFactory.getRpcClient(ozoneConfig);
+ store = ozClient.getObjectStore();
+ ozoneManager = cluster.getOzoneManager();
+ storageContainerLocationClient =
+ cluster.getStorageContainerLocationClient();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ if (ozClient != null) {
+ ozClient.close();
+ }
+ if (storageContainerLocationClient != null) {
+ storageContainerLocationClient.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testOpenContainerIntegrity() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ long currentTime = Time.now();
+
+ String value = "sample value";
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ for (int i = 0; i < 10; i++) {
+ String keyName = UUID.randomUUID().toString();
+
+ OzoneOutputStream out = bucket.createKey(keyName,
+ value.getBytes().length, STAND_ALONE,
+ ONE, new HashMap<>());
+ out.write(value.getBytes());
+ out.close();
+ OzoneKey key = bucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ OzoneInputStream is = bucket.readKey(keyName);
+ byte[] fileContent = new byte[value.getBytes().length];
+ is.read(fileContent);
+ Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+ keyName, STAND_ALONE,
+ ONE));
+ Assert.assertEquals(value, new String(fileContent));
+ Assert.assertTrue(key.getCreationTime() >= currentTime);
+ Assert.assertTrue(key.getModificationTime() >= currentTime);
+ }
+
+ // wait for the container report to propagate to SCM
+ Thread.sleep(5000);
+
+
+ Assert.assertEquals(1, cluster.getHddsDatanodes().size());
+
+ HddsDatanodeService dn = cluster.getHddsDatanodes().get(0);
+ OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
+ ContainerSet cs = oc.getContainerSet();
+ Container c = cs.getContainerIterator().next();
+
+ Assert.assertTrue(cs.containerCount() > 0);
+
+ // delete the chunks directory.
+ File chunksDir = new File(c.getContainerData().getContainerPath(), "chunks");
+ deleteDirectory(chunksDir);
+ Assert.assertFalse(chunksDir.exists());
+
+ ContainerScrubber sb = new ContainerScrubber(ozoneConfig, oc.getController());
+ sb.scrub(c);
+
+ // wait for the incremental container report to propagate to SCM
+ Thread.sleep(5000);
+
+ ContainerManager cm = cluster.getStorageContainerManager().getContainerManager();
+ Set replicas = cm.getContainerReplicas(
+ ContainerID.valueof(c.getContainerData().getContainerID()));
+ Assert.assertEquals(1, replicas.size());
+ ContainerReplica r = replicas.iterator().next();
+ Assert.assertEquals(StorageContainerDatanodeProtocolProtos.
+ ContainerReplicaProto.State.UNHEALTHY, r.getState());
+ }
+
+ boolean deleteDirectory(File directoryToBeDeleted) {
+ File[] allContents = directoryToBeDeleted.listFiles();
+ if (allContents != null) {
+ for (File file : allContents) {
+ deleteDirectory(file);
+ }
+ }
+ return directoryToBeDeleted.delete();
+ }
+
+ private boolean verifyRatisReplication(String volumeName, String bucketName,
+ String keyName, ReplicationType type, ReplicationFactor factor)
+ throws IOException {
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setRefreshPipeline(true)
+ .build();
+ HddsProtos.ReplicationType replicationType =
+ HddsProtos.ReplicationType.valueOf(type.toString());
+ HddsProtos.ReplicationFactor replicationFactor =
+ HddsProtos.ReplicationFactor.valueOf(factor.getValue());
+ OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+ for (OmKeyLocationInfo info :
+ keyInfo.getLatestVersionLocations().getLocationList()) {
+ ContainerInfo container =
+ storageContainerLocationClient.getContainer(info.getContainerID());
+ if (!container.getReplicationFactor().equals(replicationFactor) || (
+ container.getReplicationType() != replicationType)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}