From 24793d2d971788de904165f7490f17d79d078a6a Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Wed, 13 Mar 2019 04:32:39 +0900 Subject: [PATCH] HDDS-1163. Basic framework for Ozone Data Scrubber. Contributed by Supratim Deka. --- .../apache/hadoop/hdds/HddsConfigKeys.java | 5 +- .../src/main/resources/ozone-default.xml | 10 + .../common/interfaces/Container.java | 5 + .../container/keyvalue/KeyValueContainer.java | 69 ++- .../keyvalue/KeyValueContainerCheck.java | 432 ++++++++++++++++++ .../ozoneimpl/ContainerScrubber.java | 158 +++++++ .../container/ozoneimpl/OzoneContainer.java | 34 ++ .../keyvalue/TestKeyValueContainerCheck.java | 194 ++++++++ 8 files changed, 904 insertions(+), 3 deletions(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 3dd28f6f0a..3bb38950d0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -65,6 +65,9 @@ public final class HddsConfigKeys { public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; public static final String HDDS_SCM_CHILLMODE_ENABLED = "hdds.scm.chillmode.enabled"; + public static final String HDDS_CONTAINERSCRUB_ENABLED = + "hdds.containerscrub.enabled"; + public static final boolean HDDS_CONTAINERSCRUB_ENABLED_DEFAULT = false; public static final boolean HDDS_SCM_CHILLMODE_ENABLED_DEFAULT = true; public static final String HDDS_SCM_CHILLMODE_MIN_DATANODE = "hdds.scm.chillmode.min.datanode"; @@ -255,4 +258,4 @@ private HddsConfigKeys() { public static final String HDDS_DATANODE_HTTP_KERBEROS_KEYTAB_FILE_KEY = "hdds.datanode.http.kerberos.keytab"; -} \ No newline at end of file +} diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 54eb5c83a1..331b5c4a2e 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1346,6 +1346,16 @@ + + hdds.containerscrub.enabled + false + DATANODE + + Boolean value to enable data and metadata scrubbing in the containers + running on each datanode. + + + hdds.container.action.max.limit 20 diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java index 89f09fd345..1fcaaf5182 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java @@ -151,4 +151,9 @@ ContainerReplicaProto getContainerReport() * updates the blockCommitSequenceId. */ void updateBlockCommitSequenceId(long blockCommitSequenceId); + + /** + * check and report the structural integrity of the container. + */ + void check() throws StorageContainerException; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index de1b109f39..20dfd9c925 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -565,8 +565,13 @@ public void writeLockInterruptibly() throws InterruptedException { */ @Override public File getContainerFile() { - return new File(containerData.getMetadataPath(), containerData - .getContainerID() + OzoneConsts.CONTAINER_EXTENSION); + return getContainerFile(containerData.getMetadataPath(), + containerData.getContainerID()); + } + + static File getContainerFile(String metadataPath, long containerId) { + return new File(metadataPath, + containerId + OzoneConsts.CONTAINER_EXTENSION); } @Override @@ -634,6 +639,66 @@ public File getContainerDBFile() { .getContainerID() + OzoneConsts.DN_CONTAINER_DB); } + /** + * run integrity checks on the Container metadata. + */ + public void check() throws StorageContainerException { + ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK; + long containerId = containerData.getContainerID(); + + switch (containerData.getState()) { + case OPEN: + level = ContainerCheckLevel.FAST_CHECK; + LOG.info("Doing Fast integrity checks for Container ID : {}," + + " because it is OPEN", containerId); + break; + case CLOSING: + level = ContainerCheckLevel.FAST_CHECK; + LOG.info("Doing Fast integrity checks for Container ID : {}," + + " because it is CLOSING", containerId); + break; + case CLOSED: + case QUASI_CLOSED: + level = ContainerCheckLevel.FULL_CHECK; + LOG.debug("Doing Full integrity checks for Container ID : {}," + + " because it is in {} state", containerId, + containerData.getState()); + break; + default: + throw new StorageContainerException( + "Invalid Container state found for Container : " + containerData + .getContainerID(), INVALID_CONTAINER_STATE); + } + + if (level == ContainerCheckLevel.NO_CHECK) { + LOG.debug("Skipping integrity checks for Container Id : {}", containerId); + return; + } + + KeyValueContainerCheck checker = + new KeyValueContainerCheck(containerData.getMetadataPath(), config, + containerId, containerData); + + switch (level) { + case FAST_CHECK: + checker.fastCheck(); + break; + case FULL_CHECK: + checker.fullCheck(); + break; + case NO_CHECK: + LOG.debug("Skipping integrity checks for Container Id : {}", containerId); + break; + default: + // we should not be here at all, scuttle the ship! + Preconditions.checkNotNull(0, "Invalid Containercheck level"); + } + } + + private enum ContainerCheckLevel { + NO_CHECK, FAST_CHECK, FULL_CHECK + } + /** * Creates a temporary file. * @param file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java new file mode 100644 index 0000000000..5366c27c77 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; +import org.apache.hadoop.utils.MetadataStore; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB; + +/** + * Class to run integrity checks on Datanode Containers. + * Provide infra for Data Scrubbing + */ + +public class KeyValueContainerCheck { + + private static final Logger LOG = LoggerFactory.getLogger(Container.class); + + private long containerID; + private KeyValueContainerData inMemContainerData; //from caller, maybe null + private KeyValueContainerData onDiskContainerData; //loaded from fs/disk + private Configuration checkConfig; + + private String metadataPath; + + public KeyValueContainerCheck(String metadataPath, Configuration conf, + long containerID, KeyValueContainerData containerData) { + Preconditions.checkArgument(metadataPath != null); + + this.checkConfig = conf; + this.containerID = containerID; + this.onDiskContainerData = null; + this.inMemContainerData = containerData; + this.metadataPath = metadataPath; + } + + /** + * fast checks are basic and do not look inside the metadata files. + * Or into the structures on disk. These checks can be done on Open + * containers as well without concurrency implications + * Checks : + * 1. check directory layout + * 2. check container file + * + * @return void + */ + + public KvCheckError fastCheck() { + + KvCheckError error; + LOG.trace("Running fast check for container {};", containerID); + + error = loadContainerData(); + if (error != KvCheckError.ERROR_NONE) { + return error; + } + + error = checkLayout(); + if (error != KvCheckError.ERROR_NONE) { + return error; + } + + error = checkContainerFile(); + + return error; + } + + /** + * full checks comprise scanning all metadata inside the container. + * Including the KV database. These checks are intrusive, consume more + * resources compared to fast checks and should only be done on Closed + * or Quasi-closed Containers. Concurrency being limited to delete + * workflows. + *

+ * fullCheck is a superset of fastCheck + * + * @return void + */ + public KvCheckError fullCheck() { + /** + + */ + KvCheckError error; + + error = fastCheck(); + if (error != KvCheckError.ERROR_NONE) { + + LOG.trace("fastCheck failed, aborting full check for Container {}", + containerID); + return error; + } + + error = checkBlockDB(); + + return error; + } + + /** + * Check the integrity of the directory structure of the container. + * + * @return error code or ERROR_NONE + */ + private KvCheckError checkLayout() { + boolean success; + KvCheckError error = KvCheckError.ERROR_NONE; + + // is metadataPath accessible as a directory? + try { + checkDirPath(metadataPath); + } catch (IOException ie) { + error = KvCheckError.METADATA_PATH_ACCESS; + handleCorruption(ie.getMessage(), error, ie); + return error; + } + + String chunksPath = onDiskContainerData.getChunksPath(); + // is chunksPath accessible as a directory? + try { + checkDirPath(chunksPath); + } catch (IOException ie) { + error = KvCheckError.CHUNKS_PATH_ACCESS; + handleCorruption(ie.getMessage(), error, ie); + return error; + } + + return error; + } + + private void checkDirPath(String path) throws IOException { + + File dirPath = new File(path); + String errStr = null; + boolean success = true; + + try { + if (!dirPath.isDirectory()) { + success = false; + errStr = "Not a directory [" + path + "]"; + } + } catch (SecurityException se) { + throw new IOException("Security exception checking dir [" + + path + "]", se); + } catch (Exception e) { + throw new IOException("Generic exception checking dir [" + + path + "]", e); + } + + try { + String[] ls = dirPath.list(); + if (ls == null) { + // null result implies operation failed + success = false; + errStr = "null listing for directory [" + path + "]"; + } + } catch (Exception e) { + throw new IOException("Exception listing dir [" + path + "]", e); + } + + if (!success) { + Preconditions.checkState(errStr != null); + throw new IOException(errStr); + } + } + + private KvCheckError checkContainerFile() { + /** + * compare the values in the container file loaded from disk, + * with the values we are expecting + */ + KvCheckError error = KvCheckError.ERROR_NONE; + String dbType; + Preconditions + .checkState(onDiskContainerData != null, "Container File not loaded"); + KvCheckAction next; + + try { + ContainerUtils.verifyChecksum(onDiskContainerData); + } catch (Exception e) { + error = KvCheckError.CONTAINERDATA_CKSUM; + handleCorruption("Container File Checksum mismatch", error, e); + return error; + } + + if (onDiskContainerData.getContainerType() + != ContainerProtos.ContainerType.KeyValueContainer) { + String errStr = "Bad Container type in Containerdata for " + containerID; + error = KvCheckError.CONTAINERDATA_TYPE; + handleCorruption(errStr, error, null); + return error; // Abort if we do not know the type of Container + } + + if (onDiskContainerData.getContainerID() != containerID) { + String errStr = + "Bad ContainerID field in Containerdata for " + containerID; + error = KvCheckError.CONTAINERDATA_ID; + next = handleCorruption(errStr, error, null); + if (next == KvCheckAction.ABORT) { + return error; + } // else continue checking other data elements + } + + dbType = onDiskContainerData.getContainerDBType(); + if (!dbType.equals(OZONE_METADATA_STORE_IMPL_ROCKSDB) && + !dbType.equals(OZONE_METADATA_STORE_IMPL_LEVELDB)) { + String errStr = "Unknown DBType [" + dbType + + "] in Container File for [" + containerID + "]"; + error = KvCheckError.CONTAINERDATA_DBTYPE; + handleCorruption(errStr, error, null); + return error; + } + + KeyValueContainerData kvData = onDiskContainerData; + if (!metadataPath.toString().equals(kvData.getMetadataPath())) { + String errStr = + "Bad metadata path in Containerdata for " + containerID + "Expected [" + + metadataPath.toString() + "] Got [" + kvData.getMetadataPath() + + "]"; + error = KvCheckError.CONTAINERDATA_METADATA_PATH; + next = handleCorruption(errStr, error, null); + if (next == KvCheckAction.ABORT) { + return error; + } + } + + return error; + } + + private KvCheckError checkBlockDB() { + /** + * Check the integrity of the DB inside each container. + * In Scope: + * 1. iterate over each key (Block) and locate the chunks for the block + * 2. garbage detection : chunks which exist in the filesystem, + * but not in the DB. This function is implemented as HDDS-1202 + * Not in scope: + * 1. chunk checksum verification. this is left to a separate + * slow chunk scanner + */ + KvCheckError error; + Preconditions.checkState(onDiskContainerData != null, + "invoke loadContainerData prior to calling this function"); + File dbFile; + File metaDir = new File(metadataPath); + + try { + dbFile = KeyValueContainerLocationUtil + .getContainerDBFile(metaDir, containerID); + + if (!dbFile.exists() || !dbFile.canRead()) { + + String dbFileErrorMsg = "Unable to access DB File [" + dbFile.toString() + + "] for Container [" + containerID + "] metadata path [" + + metadataPath + "]"; + error = KvCheckError.DB_ACCESS; + handleCorruption(dbFileErrorMsg, error, null); + return error; + } + } catch (Exception e) { + String dbFileErrorMessage = + "Exception when initializing DBFile" + "with metadatapath [" + + metadataPath + "] for Container [" + containerID + + "]"; + error = KvCheckError.DB_ACCESS; + handleCorruption(dbFileErrorMessage, error, e); + return error; + } + onDiskContainerData.setDbFile(dbFile); + + try { + MetadataStore db = BlockUtils + .getDB(onDiskContainerData, checkConfig); + error = iterateBlockDB(db); + } catch (Exception e) { + error = KvCheckError.DB_ITERATOR; + handleCorruption("Block DB Iterator aborted", error, e); + return error; + } + + return error; + } + + private KvCheckError iterateBlockDB(MetadataStore db) + throws IOException { + KvCheckError error = KvCheckError.ERROR_NONE; + Preconditions.checkState(db != null); + + // get "normal" keys from the Block DB + KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID, + new File(onDiskContainerData.getContainerPath())); + + // ensure there is a chunk file for each key in the DB + while (kvIter.hasNext()) { + BlockData block = kvIter.nextBlock(); + + List chunkInfoList = block.getChunks(); + for (ContainerProtos.ChunkInfo chunk : chunkInfoList) { + File chunkFile; + try { + chunkFile = ChunkUtils + .getChunkFile(onDiskContainerData, + ChunkInfo.getFromProtoBuf(chunk)); + } catch (Exception e) { + error = KvCheckError.MISSING_CHUNK_FILE; + handleCorruption("Unable to access chunk path", error, e); + return error; + } + + if (!chunkFile.exists()) { + error = KvCheckError.MISSING_CHUNK_FILE; + + // concurrent mutation in Block DB? lookup the block again. + byte[] bdata = db.get( + Longs.toByteArray(block.getBlockID().getLocalID())); + if (bdata == null) { + LOG.trace("concurrency with delete, ignoring deleted block"); + error = KvCheckError.ERROR_NONE; + break; // skip to next block from kvIter + } else { + handleCorruption("Missing chunk file", error, null); + return error; + } + } + } + } + + return error; + } + + private KvCheckError loadContainerData() { + KvCheckError error = KvCheckError.ERROR_NONE; + + File containerFile = KeyValueContainer + .getContainerFile(metadataPath.toString(), containerID); + + try { + onDiskContainerData = (KeyValueContainerData) ContainerDataYaml + .readContainerFile(containerFile); + } catch (IOException e) { + error = KvCheckError.FILE_LOAD; + handleCorruption("Unable to load Container File", error, e); + } + + return error; + } + + private KvCheckAction handleCorruption(String reason, + KvCheckError error, Exception e) { + + // XXX HDDS-1201 need to implement corruption handling/reporting + + String errStr = + "Corruption detected in container: [" + containerID + "] reason: [" + + reason + "] error code: [" + error + "]"; + String logMessage = null; + + StackTraceElement[] stackeElems = Thread.currentThread().getStackTrace(); + String caller = + "Corruption reported from Source File: [" + stackeElems[2].getFileName() + + "] Line: [" + stackeElems[2].getLineNumber() + "]"; + + if (e != null) { + logMessage = errStr + " exception: [" + e.getMessage() + "]"; + e.printStackTrace(); + } else { + logMessage = errStr; + } + + LOG.error(caller); + LOG.error(logMessage); + + return KvCheckAction.ABORT; + } + + /** + * Pre-defined error codes for Container Metadata check. + */ + public enum KvCheckError { + ERROR_NONE, + FILE_LOAD, // unable to load container metafile + METADATA_PATH_ACCESS, // metadata path is not accessible + CHUNKS_PATH_ACCESS, // chunks path is not accessible + CONTAINERDATA_ID, // bad Container-ID stored in Container file + CONTAINERDATA_METADATA_PATH, // bad metadata path in Container file + CONTAINERDATA_CHUNKS_PATH, // bad chunks path in Container file + CONTAINERDATA_CKSUM, // container file checksum mismatch + CONTAINERDATA_TYPE, // container file incorrect type of Container + CONTAINERDATA_DBTYPE, // unknown DB Type specified in Container File + DB_ACCESS, // unable to load Metastore DB + DB_ITERATOR, // unable to create block iterator for Metastore DB + MISSING_CHUNK_FILE // chunk file not found + } + + private enum KvCheckAction { + CONTINUE, // Continue with remaining checks on the corrupt Container + ABORT // Abort checks for the container + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java new file mode 100644 index 0000000000..dea7323d75 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.base.Preconditions; +import org.apache.commons.net.ntp.TimeStamp; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; + +/** + * Background Metadata scrubbing for Ozone Containers. + * Future scope : data(chunks) checksum verification. + */ +public class ContainerScrubber implements Runnable { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerScrubber.class); + private final ContainerSet containerSet; + private final OzoneConfiguration config; + private final long timePerContainer = 10000; // 10 sec in millis + private boolean halt; + private Thread scrubThread; + + public ContainerScrubber(ContainerSet cSet, OzoneConfiguration conf) { + Preconditions.checkNotNull(cSet, + "ContainerScrubber received a null ContainerSet"); + Preconditions.checkNotNull(conf); + this.containerSet = cSet; + this.config = conf; + this.halt = false; + this.scrubThread = null; + } + + @Override public void run() { + /** + * the outer daemon loop exits on down() + */ + LOG.info("Background ContainerScrubber starting up"); + while (true) { + + scrub(); + + if (this.halt) { + break; // stop and exit if requested + } + + try { + Thread.sleep(300000); /* 5 min between scans */ + } catch (InterruptedException e) { + LOG.info("Background ContainerScrubber interrupted. Going to exit"); + } + } + } + + /** + * Start the scrub scanner thread. + */ + public void up() { + + this.halt = false; + if (this.scrubThread == null) { + this.scrubThread = new Thread(this); + scrubThread.start(); + } else { + LOG.info("Scrubber up called multiple times. Scrub thread already up."); + } + } + + /** + * Stop the scrub scanner thread. Wait for thread to exit + */ + public void down() { + + this.halt = true; + if (scrubThread == null) { + LOG.info("Scrubber down invoked, but scrub thread is not running"); + return; + } + + this.scrubThread.interrupt(); + try { + this.scrubThread.join(); + } catch (Exception e) { + LOG.warn("Exception when waiting for Container Scrubber thread ", e); + } finally { + this.scrubThread = null; + } + } + + /** + * Current implementation : fixed rate scrub, no feedback loop. + * Dynamic throttling based on system load monitoring to be + * implemented later as jira [XXX] + * + * @param startTime + */ + private void throttleScrubber(TimeStamp startTime) { + TimeStamp endTime = new TimeStamp(System.currentTimeMillis()); + long timeTaken = endTime.getTime() - startTime.getTime(); + + if (timeTaken < timePerContainer) { + try { + Thread.sleep(timePerContainer - timeTaken); + } catch (InterruptedException e) { + LOG.debug("Ignoring interrupted sleep inside throttle"); + } + } + } + + private void scrub() { + + Iterator containerIt = containerSet.getContainerIterator(); + long count = 0; + + while (containerIt.hasNext()) { + TimeStamp startTime = new TimeStamp(System.currentTimeMillis()); + Container container = containerIt.next(); + + if (this.halt) { + break; // stop if requested + } + + try { + container.check(); + } catch (StorageContainerException e) { + LOG.error("Error unexpected exception {} for Container {}", e, + container.getContainerData().getContainerID()); + // XXX Action required here + } + count++; + + throttleScrubber(startTime); + } + + LOG.debug("iterator ran integrity checks on {} containers", count); + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 92d76ef65c..3bc060a343 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto @@ -69,6 +70,7 @@ public class OzoneContainer { private final XceiverServerSpi writeChannel; private final XceiverServerSpi readChannel; private final ContainerController controller; + private ContainerScrubber scrubber; /** * Construct OzoneContainer object. @@ -82,6 +84,8 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration this.config = conf; this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); this.containerSet = new ContainerSet(); + this.scrubber = null; + buildContainerSet(); final ContainerMetrics metrics = ContainerMetrics.create(conf); this.handlers = Maps.newHashMap(); @@ -139,6 +143,34 @@ private void buildContainerSet() { } + + /** + * Start background daemon thread for performing container integrity checks. + */ + private void startContainerScrub() { + boolean enabled = config.getBoolean( + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED, + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT); + + if (!enabled) { + LOG.info("Background container scrubber has been disabled by {}", + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED); + } else { + this.scrubber = new ContainerScrubber(containerSet, config); + scrubber.up(); + } + } + + /** + * Stop the scanner thread and wait for thread to die. + */ + private void stopContainerScrub() { + if (scrubber == null) { + return; + } + scrubber.down(); + } + /** * Starts serving requests to ozone container. * @@ -146,6 +178,7 @@ private void buildContainerSet() { */ public void start() throws IOException { LOG.info("Attempting to start container services."); + startContainerScrub(); writeChannel.start(); readChannel.start(); hddsDispatcher.init(); @@ -157,6 +190,7 @@ public void start() throws IOException { public void stop() { //TODO: at end of container IO integration work. LOG.info("Attempting to stop container services."); + stopContainerScrub(); writeChannel.stop(); readChannel.stop(); hddsDispatcher.shutdown(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java new file mode 100644 index 0000000000..f1b8fe7113 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue; + +import com.google.common.primitives.Longs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; +import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl; +import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.utils.MetadataStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; +import java.util.ArrayList; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB; +import static org.junit.Assert.assertTrue; + +/** + * Basic sanity test for the KeyValueContainerCheck class. + */ +@RunWith(Parameterized.class) public class TestKeyValueContainerCheck { + private final String storeImpl; + private KeyValueContainer container; + private KeyValueContainerData containerData; + private ChunkManagerImpl chunkManager; + private VolumeSet volumeSet; + private Configuration conf; + private File testRoot; + + public TestKeyValueContainerCheck(String metadataImpl) { + this.storeImpl = metadataImpl; + } + + @Parameterized.Parameters public static Collection data() { + return Arrays.asList(new Object[][] {{OZONE_METADATA_STORE_IMPL_LEVELDB}, + {OZONE_METADATA_STORE_IMPL_ROCKSDB}}); + } + + @Before public void setUp() throws Exception { + this.testRoot = GenericTestUtils.getRandomizedTestDir(); + conf = new OzoneConfiguration(); + conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath()); + conf.set(OZONE_METADATA_STORE_IMPL, storeImpl); + volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf); + } + + @After public void teardown() { + volumeSet.shutdown(); + FileUtil.fullyDelete(testRoot); + } + + /** + * Sanity test, when there are no corruptions induced. + * @throws Exception + */ + @Test public void testKeyValueContainerCheckNoCorruption() throws Exception { + long containerID = 101; + int deletedBlocks = 1; + int normalBlocks = 3; + int chunksPerBlock = 4; + KeyValueContainerCheck.KvCheckError error; + + // test Closed Container + createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536, + chunksPerBlock); + File chunksPath = new File(containerData.getChunksPath()); + assertTrue(chunksPath.listFiles().length + == (deletedBlocks + normalBlocks) * chunksPerBlock); + + KeyValueContainerCheck kvCheck = + new KeyValueContainerCheck(containerData.getMetadataPath(), conf, + containerID, containerData); + + // first run checks on a Open Container + error = kvCheck.fastCheck(); + assertTrue(error == KeyValueContainerCheck.KvCheckError.ERROR_NONE); + + container.close(); + + // next run checks on a Closed Container + error = kvCheck.fullCheck(); + assertTrue(error == KeyValueContainerCheck.KvCheckError.ERROR_NONE); + } + + /** + * Creates a container with normal and deleted blocks. + * First it will insert normal blocks, and then it will insert + * deleted blocks. + * @param containerId + * @param normalBlocks + * @param deletedBlocks + * @throws Exception + */ + private void createContainerWithBlocks(long containerId, int normalBlocks, + int deletedBlocks, long chunkLen, int chunksPerBlock) throws Exception { + long chunkCount; + String strBlock = "block"; + String strChunk = "-chunkFile"; + byte[] chunkData = new byte[(int) chunkLen]; + + containerData = new KeyValueContainerData(containerId, + (long) StorageUnit.MB.toBytes(chunksPerBlock * chunkLen), + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + container = new KeyValueContainer(containerData, conf); + container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), + UUID.randomUUID().toString()); + MetadataStore metadataStore = BlockUtils.getDB(containerData, conf); + chunkManager = new ChunkManagerImpl(true); + + assertTrue(containerData.getChunksPath() != null); + File chunksPath = new File(containerData.getChunksPath()); + assertTrue(chunksPath.exists()); + // Initially chunks folder should be empty. + assertTrue(chunksPath.listFiles().length == 0); + + List chunkList = new ArrayList<>(); + for (int i = 0; i < (normalBlocks + deletedBlocks); i++) { + BlockID blockID = new BlockID(containerId, i); + BlockData blockData = new BlockData(blockID); + + chunkList.clear(); + for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) { + String chunkName = strBlock + i + strChunk + chunkCount; + long offset = chunkCount * chunkLen; + ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen); + chunkList.add(info.getProtoBufMessage()); + chunkManager + .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData), + new DispatcherContext.Builder() + .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) + .build()); + chunkManager + .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData), + new DispatcherContext.Builder() + .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA) + .build()); + } + blockData.setChunks(chunkList); + + if (i >= normalBlocks) { + // deleted key + metadataStore.put(DFSUtil.string2Bytes( + OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()), + blockData.getProtoBufMessage().toByteArray()); + } else { + // normal key + metadataStore.put(Longs.toByteArray(blockID.getLocalID()), + blockData.getProtoBufMessage().toByteArray()); + } + } + } +} \ No newline at end of file