diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 3dd28f6f0a..3bb38950d0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -65,6 +65,9 @@ public final class HddsConfigKeys {
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
public static final String HDDS_SCM_CHILLMODE_ENABLED =
"hdds.scm.chillmode.enabled";
+ public static final String HDDS_CONTAINERSCRUB_ENABLED =
+ "hdds.containerscrub.enabled";
+ public static final boolean HDDS_CONTAINERSCRUB_ENABLED_DEFAULT = false;
public static final boolean HDDS_SCM_CHILLMODE_ENABLED_DEFAULT = true;
public static final String HDDS_SCM_CHILLMODE_MIN_DATANODE =
"hdds.scm.chillmode.min.datanode";
@@ -255,4 +258,4 @@ private HddsConfigKeys() {
public static final String
HDDS_DATANODE_HTTP_KERBEROS_KEYTAB_FILE_KEY =
"hdds.datanode.http.kerberos.keytab";
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 54eb5c83a1..331b5c4a2e 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1346,6 +1346,16 @@
+
+ hdds.containerscrub.enabled
+ false
+ DATANODE
+
+ Boolean value to enable data and metadata scrubbing in the containers
+ running on each datanode.
+
+
+
hdds.container.action.max.limit20
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 89f09fd345..1fcaaf5182 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -151,4 +151,9 @@ ContainerReplicaProto getContainerReport()
* updates the blockCommitSequenceId.
*/
void updateBlockCommitSequenceId(long blockCommitSequenceId);
+
+ /**
+ * check and report the structural integrity of the container.
+ */
+ void check() throws StorageContainerException;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index de1b109f39..20dfd9c925 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -565,8 +565,13 @@ public void writeLockInterruptibly() throws InterruptedException {
*/
@Override
public File getContainerFile() {
- return new File(containerData.getMetadataPath(), containerData
- .getContainerID() + OzoneConsts.CONTAINER_EXTENSION);
+ return getContainerFile(containerData.getMetadataPath(),
+ containerData.getContainerID());
+ }
+
+ static File getContainerFile(String metadataPath, long containerId) {
+ return new File(metadataPath,
+ containerId + OzoneConsts.CONTAINER_EXTENSION);
}
@Override
@@ -634,6 +639,66 @@ public File getContainerDBFile() {
.getContainerID() + OzoneConsts.DN_CONTAINER_DB);
}
+ /**
+ * run integrity checks on the Container metadata.
+ */
+ public void check() throws StorageContainerException {
+ ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK;
+ long containerId = containerData.getContainerID();
+
+ switch (containerData.getState()) {
+ case OPEN:
+ level = ContainerCheckLevel.FAST_CHECK;
+ LOG.info("Doing Fast integrity checks for Container ID : {},"
+ + " because it is OPEN", containerId);
+ break;
+ case CLOSING:
+ level = ContainerCheckLevel.FAST_CHECK;
+ LOG.info("Doing Fast integrity checks for Container ID : {},"
+ + " because it is CLOSING", containerId);
+ break;
+ case CLOSED:
+ case QUASI_CLOSED:
+ level = ContainerCheckLevel.FULL_CHECK;
+ LOG.debug("Doing Full integrity checks for Container ID : {},"
+ + " because it is in {} state", containerId,
+ containerData.getState());
+ break;
+ default:
+ throw new StorageContainerException(
+ "Invalid Container state found for Container : " + containerData
+ .getContainerID(), INVALID_CONTAINER_STATE);
+ }
+
+ if (level == ContainerCheckLevel.NO_CHECK) {
+ LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
+ return;
+ }
+
+ KeyValueContainerCheck checker =
+ new KeyValueContainerCheck(containerData.getMetadataPath(), config,
+ containerId, containerData);
+
+ switch (level) {
+ case FAST_CHECK:
+ checker.fastCheck();
+ break;
+ case FULL_CHECK:
+ checker.fullCheck();
+ break;
+ case NO_CHECK:
+ LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
+ break;
+ default:
+ // we should not be here at all, scuttle the ship!
+ Preconditions.checkNotNull(0, "Invalid Containercheck level");
+ }
+ }
+
+ private enum ContainerCheckLevel {
+ NO_CHECK, FAST_CHECK, FULL_CHECK
+ }
+
/**
* Creates a temporary file.
* @param file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
new file mode 100644
index 0000000000..5366c27c77
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.utils.MetadataStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
+
+/**
+ * Class to run integrity checks on Datanode Containers.
+ * Provide infra for Data Scrubbing
+ */
+
+public class KeyValueContainerCheck {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Container.class);
+
+ private long containerID;
+ private KeyValueContainerData inMemContainerData; //from caller, maybe null
+ private KeyValueContainerData onDiskContainerData; //loaded from fs/disk
+ private Configuration checkConfig;
+
+ private String metadataPath;
+
+ public KeyValueContainerCheck(String metadataPath, Configuration conf,
+ long containerID, KeyValueContainerData containerData) {
+ Preconditions.checkArgument(metadataPath != null);
+
+ this.checkConfig = conf;
+ this.containerID = containerID;
+ this.onDiskContainerData = null;
+ this.inMemContainerData = containerData;
+ this.metadataPath = metadataPath;
+ }
+
+ /**
+ * fast checks are basic and do not look inside the metadata files.
+ * Or into the structures on disk. These checks can be done on Open
+ * containers as well without concurrency implications
+ * Checks :
+ * 1. check directory layout
+ * 2. check container file
+ *
+ * @return void
+ */
+
+ public KvCheckError fastCheck() {
+
+ KvCheckError error;
+ LOG.trace("Running fast check for container {};", containerID);
+
+ error = loadContainerData();
+ if (error != KvCheckError.ERROR_NONE) {
+ return error;
+ }
+
+ error = checkLayout();
+ if (error != KvCheckError.ERROR_NONE) {
+ return error;
+ }
+
+ error = checkContainerFile();
+
+ return error;
+ }
+
+ /**
+ * full checks comprise scanning all metadata inside the container.
+ * Including the KV database. These checks are intrusive, consume more
+ * resources compared to fast checks and should only be done on Closed
+ * or Quasi-closed Containers. Concurrency being limited to delete
+ * workflows.
+ *
+ * fullCheck is a superset of fastCheck
+ *
+ * @return void
+ */
+ public KvCheckError fullCheck() {
+ /**
+
+ */
+ KvCheckError error;
+
+ error = fastCheck();
+ if (error != KvCheckError.ERROR_NONE) {
+
+ LOG.trace("fastCheck failed, aborting full check for Container {}",
+ containerID);
+ return error;
+ }
+
+ error = checkBlockDB();
+
+ return error;
+ }
+
+ /**
+ * Check the integrity of the directory structure of the container.
+ *
+ * @return error code or ERROR_NONE
+ */
+ private KvCheckError checkLayout() {
+ boolean success;
+ KvCheckError error = KvCheckError.ERROR_NONE;
+
+ // is metadataPath accessible as a directory?
+ try {
+ checkDirPath(metadataPath);
+ } catch (IOException ie) {
+ error = KvCheckError.METADATA_PATH_ACCESS;
+ handleCorruption(ie.getMessage(), error, ie);
+ return error;
+ }
+
+ String chunksPath = onDiskContainerData.getChunksPath();
+ // is chunksPath accessible as a directory?
+ try {
+ checkDirPath(chunksPath);
+ } catch (IOException ie) {
+ error = KvCheckError.CHUNKS_PATH_ACCESS;
+ handleCorruption(ie.getMessage(), error, ie);
+ return error;
+ }
+
+ return error;
+ }
+
+ private void checkDirPath(String path) throws IOException {
+
+ File dirPath = new File(path);
+ String errStr = null;
+ boolean success = true;
+
+ try {
+ if (!dirPath.isDirectory()) {
+ success = false;
+ errStr = "Not a directory [" + path + "]";
+ }
+ } catch (SecurityException se) {
+ throw new IOException("Security exception checking dir ["
+ + path + "]", se);
+ } catch (Exception e) {
+ throw new IOException("Generic exception checking dir ["
+ + path + "]", e);
+ }
+
+ try {
+ String[] ls = dirPath.list();
+ if (ls == null) {
+ // null result implies operation failed
+ success = false;
+ errStr = "null listing for directory [" + path + "]";
+ }
+ } catch (Exception e) {
+ throw new IOException("Exception listing dir [" + path + "]", e);
+ }
+
+ if (!success) {
+ Preconditions.checkState(errStr != null);
+ throw new IOException(errStr);
+ }
+ }
+
+ private KvCheckError checkContainerFile() {
+ /**
+ * compare the values in the container file loaded from disk,
+ * with the values we are expecting
+ */
+ KvCheckError error = KvCheckError.ERROR_NONE;
+ String dbType;
+ Preconditions
+ .checkState(onDiskContainerData != null, "Container File not loaded");
+ KvCheckAction next;
+
+ try {
+ ContainerUtils.verifyChecksum(onDiskContainerData);
+ } catch (Exception e) {
+ error = KvCheckError.CONTAINERDATA_CKSUM;
+ handleCorruption("Container File Checksum mismatch", error, e);
+ return error;
+ }
+
+ if (onDiskContainerData.getContainerType()
+ != ContainerProtos.ContainerType.KeyValueContainer) {
+ String errStr = "Bad Container type in Containerdata for " + containerID;
+ error = KvCheckError.CONTAINERDATA_TYPE;
+ handleCorruption(errStr, error, null);
+ return error; // Abort if we do not know the type of Container
+ }
+
+ if (onDiskContainerData.getContainerID() != containerID) {
+ String errStr =
+ "Bad ContainerID field in Containerdata for " + containerID;
+ error = KvCheckError.CONTAINERDATA_ID;
+ next = handleCorruption(errStr, error, null);
+ if (next == KvCheckAction.ABORT) {
+ return error;
+ } // else continue checking other data elements
+ }
+
+ dbType = onDiskContainerData.getContainerDBType();
+ if (!dbType.equals(OZONE_METADATA_STORE_IMPL_ROCKSDB) &&
+ !dbType.equals(OZONE_METADATA_STORE_IMPL_LEVELDB)) {
+ String errStr = "Unknown DBType [" + dbType
+ + "] in Container File for [" + containerID + "]";
+ error = KvCheckError.CONTAINERDATA_DBTYPE;
+ handleCorruption(errStr, error, null);
+ return error;
+ }
+
+ KeyValueContainerData kvData = onDiskContainerData;
+ if (!metadataPath.toString().equals(kvData.getMetadataPath())) {
+ String errStr =
+ "Bad metadata path in Containerdata for " + containerID + "Expected ["
+ + metadataPath.toString() + "] Got [" + kvData.getMetadataPath()
+ + "]";
+ error = KvCheckError.CONTAINERDATA_METADATA_PATH;
+ next = handleCorruption(errStr, error, null);
+ if (next == KvCheckAction.ABORT) {
+ return error;
+ }
+ }
+
+ return error;
+ }
+
+ private KvCheckError checkBlockDB() {
+ /**
+ * Check the integrity of the DB inside each container.
+ * In Scope:
+ * 1. iterate over each key (Block) and locate the chunks for the block
+ * 2. garbage detection : chunks which exist in the filesystem,
+ * but not in the DB. This function is implemented as HDDS-1202
+ * Not in scope:
+ * 1. chunk checksum verification. this is left to a separate
+ * slow chunk scanner
+ */
+ KvCheckError error;
+ Preconditions.checkState(onDiskContainerData != null,
+ "invoke loadContainerData prior to calling this function");
+ File dbFile;
+ File metaDir = new File(metadataPath);
+
+ try {
+ dbFile = KeyValueContainerLocationUtil
+ .getContainerDBFile(metaDir, containerID);
+
+ if (!dbFile.exists() || !dbFile.canRead()) {
+
+ String dbFileErrorMsg = "Unable to access DB File [" + dbFile.toString()
+ + "] for Container [" + containerID + "] metadata path ["
+ + metadataPath + "]";
+ error = KvCheckError.DB_ACCESS;
+ handleCorruption(dbFileErrorMsg, error, null);
+ return error;
+ }
+ } catch (Exception e) {
+ String dbFileErrorMessage =
+ "Exception when initializing DBFile" + "with metadatapath ["
+ + metadataPath + "] for Container [" + containerID
+ + "]";
+ error = KvCheckError.DB_ACCESS;
+ handleCorruption(dbFileErrorMessage, error, e);
+ return error;
+ }
+ onDiskContainerData.setDbFile(dbFile);
+
+ try {
+ MetadataStore db = BlockUtils
+ .getDB(onDiskContainerData, checkConfig);
+ error = iterateBlockDB(db);
+ } catch (Exception e) {
+ error = KvCheckError.DB_ITERATOR;
+ handleCorruption("Block DB Iterator aborted", error, e);
+ return error;
+ }
+
+ return error;
+ }
+
+ private KvCheckError iterateBlockDB(MetadataStore db)
+ throws IOException {
+ KvCheckError error = KvCheckError.ERROR_NONE;
+ Preconditions.checkState(db != null);
+
+ // get "normal" keys from the Block DB
+ KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
+ new File(onDiskContainerData.getContainerPath()));
+
+ // ensure there is a chunk file for each key in the DB
+ while (kvIter.hasNext()) {
+ BlockData block = kvIter.nextBlock();
+
+ List chunkInfoList = block.getChunks();
+ for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
+ File chunkFile;
+ try {
+ chunkFile = ChunkUtils
+ .getChunkFile(onDiskContainerData,
+ ChunkInfo.getFromProtoBuf(chunk));
+ } catch (Exception e) {
+ error = KvCheckError.MISSING_CHUNK_FILE;
+ handleCorruption("Unable to access chunk path", error, e);
+ return error;
+ }
+
+ if (!chunkFile.exists()) {
+ error = KvCheckError.MISSING_CHUNK_FILE;
+
+ // concurrent mutation in Block DB? lookup the block again.
+ byte[] bdata = db.get(
+ Longs.toByteArray(block.getBlockID().getLocalID()));
+ if (bdata == null) {
+ LOG.trace("concurrency with delete, ignoring deleted block");
+ error = KvCheckError.ERROR_NONE;
+ break; // skip to next block from kvIter
+ } else {
+ handleCorruption("Missing chunk file", error, null);
+ return error;
+ }
+ }
+ }
+ }
+
+ return error;
+ }
+
+ private KvCheckError loadContainerData() {
+ KvCheckError error = KvCheckError.ERROR_NONE;
+
+ File containerFile = KeyValueContainer
+ .getContainerFile(metadataPath.toString(), containerID);
+
+ try {
+ onDiskContainerData = (KeyValueContainerData) ContainerDataYaml
+ .readContainerFile(containerFile);
+ } catch (IOException e) {
+ error = KvCheckError.FILE_LOAD;
+ handleCorruption("Unable to load Container File", error, e);
+ }
+
+ return error;
+ }
+
+ private KvCheckAction handleCorruption(String reason,
+ KvCheckError error, Exception e) {
+
+ // XXX HDDS-1201 need to implement corruption handling/reporting
+
+ String errStr =
+ "Corruption detected in container: [" + containerID + "] reason: ["
+ + reason + "] error code: [" + error + "]";
+ String logMessage = null;
+
+ StackTraceElement[] stackeElems = Thread.currentThread().getStackTrace();
+ String caller =
+ "Corruption reported from Source File: [" + stackeElems[2].getFileName()
+ + "] Line: [" + stackeElems[2].getLineNumber() + "]";
+
+ if (e != null) {
+ logMessage = errStr + " exception: [" + e.getMessage() + "]";
+ e.printStackTrace();
+ } else {
+ logMessage = errStr;
+ }
+
+ LOG.error(caller);
+ LOG.error(logMessage);
+
+ return KvCheckAction.ABORT;
+ }
+
+ /**
+ * Pre-defined error codes for Container Metadata check.
+ */
+ public enum KvCheckError {
+ ERROR_NONE,
+ FILE_LOAD, // unable to load container metafile
+ METADATA_PATH_ACCESS, // metadata path is not accessible
+ CHUNKS_PATH_ACCESS, // chunks path is not accessible
+ CONTAINERDATA_ID, // bad Container-ID stored in Container file
+ CONTAINERDATA_METADATA_PATH, // bad metadata path in Container file
+ CONTAINERDATA_CHUNKS_PATH, // bad chunks path in Container file
+ CONTAINERDATA_CKSUM, // container file checksum mismatch
+ CONTAINERDATA_TYPE, // container file incorrect type of Container
+ CONTAINERDATA_DBTYPE, // unknown DB Type specified in Container File
+ DB_ACCESS, // unable to load Metastore DB
+ DB_ITERATOR, // unable to create block iterator for Metastore DB
+ MISSING_CHUNK_FILE // chunk file not found
+ }
+
+ private enum KvCheckAction {
+ CONTINUE, // Continue with remaining checks on the corrupt Container
+ ABORT // Abort checks for the container
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java
new file mode 100644
index 0000000000..dea7323d75
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.net.ntp.TimeStamp;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+/**
+ * Background Metadata scrubbing for Ozone Containers.
+ * Future scope : data(chunks) checksum verification.
+ */
+public class ContainerScrubber implements Runnable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerScrubber.class);
+ private final ContainerSet containerSet;
+ private final OzoneConfiguration config;
+ private final long timePerContainer = 10000; // 10 sec in millis
+ private boolean halt;
+ private Thread scrubThread;
+
+ public ContainerScrubber(ContainerSet cSet, OzoneConfiguration conf) {
+ Preconditions.checkNotNull(cSet,
+ "ContainerScrubber received a null ContainerSet");
+ Preconditions.checkNotNull(conf);
+ this.containerSet = cSet;
+ this.config = conf;
+ this.halt = false;
+ this.scrubThread = null;
+ }
+
+ @Override public void run() {
+ /**
+ * the outer daemon loop exits on down()
+ */
+ LOG.info("Background ContainerScrubber starting up");
+ while (true) {
+
+ scrub();
+
+ if (this.halt) {
+ break; // stop and exit if requested
+ }
+
+ try {
+ Thread.sleep(300000); /* 5 min between scans */
+ } catch (InterruptedException e) {
+ LOG.info("Background ContainerScrubber interrupted. Going to exit");
+ }
+ }
+ }
+
+ /**
+ * Start the scrub scanner thread.
+ */
+ public void up() {
+
+ this.halt = false;
+ if (this.scrubThread == null) {
+ this.scrubThread = new Thread(this);
+ scrubThread.start();
+ } else {
+ LOG.info("Scrubber up called multiple times. Scrub thread already up.");
+ }
+ }
+
+ /**
+ * Stop the scrub scanner thread. Wait for thread to exit
+ */
+ public void down() {
+
+ this.halt = true;
+ if (scrubThread == null) {
+ LOG.info("Scrubber down invoked, but scrub thread is not running");
+ return;
+ }
+
+ this.scrubThread.interrupt();
+ try {
+ this.scrubThread.join();
+ } catch (Exception e) {
+ LOG.warn("Exception when waiting for Container Scrubber thread ", e);
+ } finally {
+ this.scrubThread = null;
+ }
+ }
+
+ /**
+ * Current implementation : fixed rate scrub, no feedback loop.
+ * Dynamic throttling based on system load monitoring to be
+ * implemented later as jira [XXX]
+ *
+ * @param startTime
+ */
+ private void throttleScrubber(TimeStamp startTime) {
+ TimeStamp endTime = new TimeStamp(System.currentTimeMillis());
+ long timeTaken = endTime.getTime() - startTime.getTime();
+
+ if (timeTaken < timePerContainer) {
+ try {
+ Thread.sleep(timePerContainer - timeTaken);
+ } catch (InterruptedException e) {
+ LOG.debug("Ignoring interrupted sleep inside throttle");
+ }
+ }
+ }
+
+ private void scrub() {
+
+ Iterator containerIt = containerSet.getContainerIterator();
+ long count = 0;
+
+ while (containerIt.hasNext()) {
+ TimeStamp startTime = new TimeStamp(System.currentTimeMillis());
+ Container container = containerIt.next();
+
+ if (this.halt) {
+ break; // stop if requested
+ }
+
+ try {
+ container.check();
+ } catch (StorageContainerException e) {
+ LOG.error("Error unexpected exception {} for Container {}", e,
+ container.getContainerData().getContainerID());
+ // XXX Action required here
+ }
+ count++;
+
+ throttleScrubber(startTime);
+ }
+
+ LOG.debug("iterator ran integrity checks on {} containers", count);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 92d76ef65c..3bc060a343 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto
@@ -69,6 +70,7 @@ public class OzoneContainer {
private final XceiverServerSpi writeChannel;
private final XceiverServerSpi readChannel;
private final ContainerController controller;
+ private ContainerScrubber scrubber;
/**
* Construct OzoneContainer object.
@@ -82,6 +84,8 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
this.config = conf;
this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
this.containerSet = new ContainerSet();
+ this.scrubber = null;
+
buildContainerSet();
final ContainerMetrics metrics = ContainerMetrics.create(conf);
this.handlers = Maps.newHashMap();
@@ -139,6 +143,34 @@ private void buildContainerSet() {
}
+
+ /**
+ * Start background daemon thread for performing container integrity checks.
+ */
+ private void startContainerScrub() {
+ boolean enabled = config.getBoolean(
+ HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED,
+ HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT);
+
+ if (!enabled) {
+ LOG.info("Background container scrubber has been disabled by {}",
+ HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED);
+ } else {
+ this.scrubber = new ContainerScrubber(containerSet, config);
+ scrubber.up();
+ }
+ }
+
+ /**
+ * Stop the scanner thread and wait for thread to die.
+ */
+ private void stopContainerScrub() {
+ if (scrubber == null) {
+ return;
+ }
+ scrubber.down();
+ }
+
/**
* Starts serving requests to ozone container.
*
@@ -146,6 +178,7 @@ private void buildContainerSet() {
*/
public void start() throws IOException {
LOG.info("Attempting to start container services.");
+ startContainerScrub();
writeChannel.start();
readChannel.start();
hddsDispatcher.init();
@@ -157,6 +190,7 @@ public void start() throws IOException {
public void stop() {
//TODO: at end of container IO integration work.
LOG.info("Attempting to stop container services.");
+ stopContainerScrub();
writeChannel.stop();
readChannel.stop();
hddsDispatcher.shutdown();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
new file mode 100644
index 0000000000..f1b8fe7113
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *