HDDS-1163. Basic framework for Ozone Data Scrubber. Contributed by Supratim Deka.
This commit is contained in:
parent
ab574ffd72
commit
24793d2d97
@ -65,6 +65,9 @@ public final class HddsConfigKeys {
|
|||||||
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
|
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
|
||||||
public static final String HDDS_SCM_CHILLMODE_ENABLED =
|
public static final String HDDS_SCM_CHILLMODE_ENABLED =
|
||||||
"hdds.scm.chillmode.enabled";
|
"hdds.scm.chillmode.enabled";
|
||||||
|
public static final String HDDS_CONTAINERSCRUB_ENABLED =
|
||||||
|
"hdds.containerscrub.enabled";
|
||||||
|
public static final boolean HDDS_CONTAINERSCRUB_ENABLED_DEFAULT = false;
|
||||||
public static final boolean HDDS_SCM_CHILLMODE_ENABLED_DEFAULT = true;
|
public static final boolean HDDS_SCM_CHILLMODE_ENABLED_DEFAULT = true;
|
||||||
public static final String HDDS_SCM_CHILLMODE_MIN_DATANODE =
|
public static final String HDDS_SCM_CHILLMODE_MIN_DATANODE =
|
||||||
"hdds.scm.chillmode.min.datanode";
|
"hdds.scm.chillmode.min.datanode";
|
||||||
|
@ -1346,6 +1346,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>hdds.containerscrub.enabled</name>
|
||||||
|
<value>false</value>
|
||||||
|
<tag>DATANODE</tag>
|
||||||
|
<description>
|
||||||
|
Boolean value to enable data and metadata scrubbing in the containers
|
||||||
|
running on each datanode.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>hdds.container.action.max.limit</name>
|
<name>hdds.container.action.max.limit</name>
|
||||||
<value>20</value>
|
<value>20</value>
|
||||||
|
@ -151,4 +151,9 @@ ContainerReplicaProto getContainerReport()
|
|||||||
* updates the blockCommitSequenceId.
|
* updates the blockCommitSequenceId.
|
||||||
*/
|
*/
|
||||||
void updateBlockCommitSequenceId(long blockCommitSequenceId);
|
void updateBlockCommitSequenceId(long blockCommitSequenceId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* check and report the structural integrity of the container.
|
||||||
|
*/
|
||||||
|
void check() throws StorageContainerException;
|
||||||
}
|
}
|
||||||
|
@ -565,8 +565,13 @@ public void writeLockInterruptibly() throws InterruptedException {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public File getContainerFile() {
|
public File getContainerFile() {
|
||||||
return new File(containerData.getMetadataPath(), containerData
|
return getContainerFile(containerData.getMetadataPath(),
|
||||||
.getContainerID() + OzoneConsts.CONTAINER_EXTENSION);
|
containerData.getContainerID());
|
||||||
|
}
|
||||||
|
|
||||||
|
static File getContainerFile(String metadataPath, long containerId) {
|
||||||
|
return new File(metadataPath,
|
||||||
|
containerId + OzoneConsts.CONTAINER_EXTENSION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -634,6 +639,66 @@ public File getContainerDBFile() {
|
|||||||
.getContainerID() + OzoneConsts.DN_CONTAINER_DB);
|
.getContainerID() + OzoneConsts.DN_CONTAINER_DB);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* run integrity checks on the Container metadata.
|
||||||
|
*/
|
||||||
|
public void check() throws StorageContainerException {
|
||||||
|
ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK;
|
||||||
|
long containerId = containerData.getContainerID();
|
||||||
|
|
||||||
|
switch (containerData.getState()) {
|
||||||
|
case OPEN:
|
||||||
|
level = ContainerCheckLevel.FAST_CHECK;
|
||||||
|
LOG.info("Doing Fast integrity checks for Container ID : {},"
|
||||||
|
+ " because it is OPEN", containerId);
|
||||||
|
break;
|
||||||
|
case CLOSING:
|
||||||
|
level = ContainerCheckLevel.FAST_CHECK;
|
||||||
|
LOG.info("Doing Fast integrity checks for Container ID : {},"
|
||||||
|
+ " because it is CLOSING", containerId);
|
||||||
|
break;
|
||||||
|
case CLOSED:
|
||||||
|
case QUASI_CLOSED:
|
||||||
|
level = ContainerCheckLevel.FULL_CHECK;
|
||||||
|
LOG.debug("Doing Full integrity checks for Container ID : {},"
|
||||||
|
+ " because it is in {} state", containerId,
|
||||||
|
containerData.getState());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Invalid Container state found for Container : " + containerData
|
||||||
|
.getContainerID(), INVALID_CONTAINER_STATE);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (level == ContainerCheckLevel.NO_CHECK) {
|
||||||
|
LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
KeyValueContainerCheck checker =
|
||||||
|
new KeyValueContainerCheck(containerData.getMetadataPath(), config,
|
||||||
|
containerId, containerData);
|
||||||
|
|
||||||
|
switch (level) {
|
||||||
|
case FAST_CHECK:
|
||||||
|
checker.fastCheck();
|
||||||
|
break;
|
||||||
|
case FULL_CHECK:
|
||||||
|
checker.fullCheck();
|
||||||
|
break;
|
||||||
|
case NO_CHECK:
|
||||||
|
LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// we should not be here at all, scuttle the ship!
|
||||||
|
Preconditions.checkNotNull(0, "Invalid Containercheck level");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum ContainerCheckLevel {
|
||||||
|
NO_CHECK, FAST_CHECK, FULL_CHECK
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a temporary file.
|
* Creates a temporary file.
|
||||||
* @param file
|
* @param file
|
||||||
|
@ -0,0 +1,432 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.keyvalue;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||||
|
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
|
||||||
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to run integrity checks on Datanode Containers.
|
||||||
|
* Provide infra for Data Scrubbing
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class KeyValueContainerCheck {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Container.class);
|
||||||
|
|
||||||
|
private long containerID;
|
||||||
|
private KeyValueContainerData inMemContainerData; //from caller, maybe null
|
||||||
|
private KeyValueContainerData onDiskContainerData; //loaded from fs/disk
|
||||||
|
private Configuration checkConfig;
|
||||||
|
|
||||||
|
private String metadataPath;
|
||||||
|
|
||||||
|
public KeyValueContainerCheck(String metadataPath, Configuration conf,
|
||||||
|
long containerID, KeyValueContainerData containerData) {
|
||||||
|
Preconditions.checkArgument(metadataPath != null);
|
||||||
|
|
||||||
|
this.checkConfig = conf;
|
||||||
|
this.containerID = containerID;
|
||||||
|
this.onDiskContainerData = null;
|
||||||
|
this.inMemContainerData = containerData;
|
||||||
|
this.metadataPath = metadataPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* fast checks are basic and do not look inside the metadata files.
|
||||||
|
* Or into the structures on disk. These checks can be done on Open
|
||||||
|
* containers as well without concurrency implications
|
||||||
|
* Checks :
|
||||||
|
* 1. check directory layout
|
||||||
|
* 2. check container file
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
|
||||||
|
public KvCheckError fastCheck() {
|
||||||
|
|
||||||
|
KvCheckError error;
|
||||||
|
LOG.trace("Running fast check for container {};", containerID);
|
||||||
|
|
||||||
|
error = loadContainerData();
|
||||||
|
if (error != KvCheckError.ERROR_NONE) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
error = checkLayout();
|
||||||
|
if (error != KvCheckError.ERROR_NONE) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
error = checkContainerFile();
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* full checks comprise scanning all metadata inside the container.
|
||||||
|
* Including the KV database. These checks are intrusive, consume more
|
||||||
|
* resources compared to fast checks and should only be done on Closed
|
||||||
|
* or Quasi-closed Containers. Concurrency being limited to delete
|
||||||
|
* workflows.
|
||||||
|
* <p>
|
||||||
|
* fullCheck is a superset of fastCheck
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public KvCheckError fullCheck() {
|
||||||
|
/**
|
||||||
|
|
||||||
|
*/
|
||||||
|
KvCheckError error;
|
||||||
|
|
||||||
|
error = fastCheck();
|
||||||
|
if (error != KvCheckError.ERROR_NONE) {
|
||||||
|
|
||||||
|
LOG.trace("fastCheck failed, aborting full check for Container {}",
|
||||||
|
containerID);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
error = checkBlockDB();
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check the integrity of the directory structure of the container.
|
||||||
|
*
|
||||||
|
* @return error code or ERROR_NONE
|
||||||
|
*/
|
||||||
|
private KvCheckError checkLayout() {
|
||||||
|
boolean success;
|
||||||
|
KvCheckError error = KvCheckError.ERROR_NONE;
|
||||||
|
|
||||||
|
// is metadataPath accessible as a directory?
|
||||||
|
try {
|
||||||
|
checkDirPath(metadataPath);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
error = KvCheckError.METADATA_PATH_ACCESS;
|
||||||
|
handleCorruption(ie.getMessage(), error, ie);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
String chunksPath = onDiskContainerData.getChunksPath();
|
||||||
|
// is chunksPath accessible as a directory?
|
||||||
|
try {
|
||||||
|
checkDirPath(chunksPath);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
error = KvCheckError.CHUNKS_PATH_ACCESS;
|
||||||
|
handleCorruption(ie.getMessage(), error, ie);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkDirPath(String path) throws IOException {
|
||||||
|
|
||||||
|
File dirPath = new File(path);
|
||||||
|
String errStr = null;
|
||||||
|
boolean success = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!dirPath.isDirectory()) {
|
||||||
|
success = false;
|
||||||
|
errStr = "Not a directory [" + path + "]";
|
||||||
|
}
|
||||||
|
} catch (SecurityException se) {
|
||||||
|
throw new IOException("Security exception checking dir ["
|
||||||
|
+ path + "]", se);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException("Generic exception checking dir ["
|
||||||
|
+ path + "]", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
String[] ls = dirPath.list();
|
||||||
|
if (ls == null) {
|
||||||
|
// null result implies operation failed
|
||||||
|
success = false;
|
||||||
|
errStr = "null listing for directory [" + path + "]";
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException("Exception listing dir [" + path + "]", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!success) {
|
||||||
|
Preconditions.checkState(errStr != null);
|
||||||
|
throw new IOException(errStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private KvCheckError checkContainerFile() {
|
||||||
|
/**
|
||||||
|
* compare the values in the container file loaded from disk,
|
||||||
|
* with the values we are expecting
|
||||||
|
*/
|
||||||
|
KvCheckError error = KvCheckError.ERROR_NONE;
|
||||||
|
String dbType;
|
||||||
|
Preconditions
|
||||||
|
.checkState(onDiskContainerData != null, "Container File not loaded");
|
||||||
|
KvCheckAction next;
|
||||||
|
|
||||||
|
try {
|
||||||
|
ContainerUtils.verifyChecksum(onDiskContainerData);
|
||||||
|
} catch (Exception e) {
|
||||||
|
error = KvCheckError.CONTAINERDATA_CKSUM;
|
||||||
|
handleCorruption("Container File Checksum mismatch", error, e);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (onDiskContainerData.getContainerType()
|
||||||
|
!= ContainerProtos.ContainerType.KeyValueContainer) {
|
||||||
|
String errStr = "Bad Container type in Containerdata for " + containerID;
|
||||||
|
error = KvCheckError.CONTAINERDATA_TYPE;
|
||||||
|
handleCorruption(errStr, error, null);
|
||||||
|
return error; // Abort if we do not know the type of Container
|
||||||
|
}
|
||||||
|
|
||||||
|
if (onDiskContainerData.getContainerID() != containerID) {
|
||||||
|
String errStr =
|
||||||
|
"Bad ContainerID field in Containerdata for " + containerID;
|
||||||
|
error = KvCheckError.CONTAINERDATA_ID;
|
||||||
|
next = handleCorruption(errStr, error, null);
|
||||||
|
if (next == KvCheckAction.ABORT) {
|
||||||
|
return error;
|
||||||
|
} // else continue checking other data elements
|
||||||
|
}
|
||||||
|
|
||||||
|
dbType = onDiskContainerData.getContainerDBType();
|
||||||
|
if (!dbType.equals(OZONE_METADATA_STORE_IMPL_ROCKSDB) &&
|
||||||
|
!dbType.equals(OZONE_METADATA_STORE_IMPL_LEVELDB)) {
|
||||||
|
String errStr = "Unknown DBType [" + dbType
|
||||||
|
+ "] in Container File for [" + containerID + "]";
|
||||||
|
error = KvCheckError.CONTAINERDATA_DBTYPE;
|
||||||
|
handleCorruption(errStr, error, null);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
KeyValueContainerData kvData = onDiskContainerData;
|
||||||
|
if (!metadataPath.toString().equals(kvData.getMetadataPath())) {
|
||||||
|
String errStr =
|
||||||
|
"Bad metadata path in Containerdata for " + containerID + "Expected ["
|
||||||
|
+ metadataPath.toString() + "] Got [" + kvData.getMetadataPath()
|
||||||
|
+ "]";
|
||||||
|
error = KvCheckError.CONTAINERDATA_METADATA_PATH;
|
||||||
|
next = handleCorruption(errStr, error, null);
|
||||||
|
if (next == KvCheckAction.ABORT) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KvCheckError checkBlockDB() {
|
||||||
|
/**
|
||||||
|
* Check the integrity of the DB inside each container.
|
||||||
|
* In Scope:
|
||||||
|
* 1. iterate over each key (Block) and locate the chunks for the block
|
||||||
|
* 2. garbage detection : chunks which exist in the filesystem,
|
||||||
|
* but not in the DB. This function is implemented as HDDS-1202
|
||||||
|
* Not in scope:
|
||||||
|
* 1. chunk checksum verification. this is left to a separate
|
||||||
|
* slow chunk scanner
|
||||||
|
*/
|
||||||
|
KvCheckError error;
|
||||||
|
Preconditions.checkState(onDiskContainerData != null,
|
||||||
|
"invoke loadContainerData prior to calling this function");
|
||||||
|
File dbFile;
|
||||||
|
File metaDir = new File(metadataPath);
|
||||||
|
|
||||||
|
try {
|
||||||
|
dbFile = KeyValueContainerLocationUtil
|
||||||
|
.getContainerDBFile(metaDir, containerID);
|
||||||
|
|
||||||
|
if (!dbFile.exists() || !dbFile.canRead()) {
|
||||||
|
|
||||||
|
String dbFileErrorMsg = "Unable to access DB File [" + dbFile.toString()
|
||||||
|
+ "] for Container [" + containerID + "] metadata path ["
|
||||||
|
+ metadataPath + "]";
|
||||||
|
error = KvCheckError.DB_ACCESS;
|
||||||
|
handleCorruption(dbFileErrorMsg, error, null);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
String dbFileErrorMessage =
|
||||||
|
"Exception when initializing DBFile" + "with metadatapath ["
|
||||||
|
+ metadataPath + "] for Container [" + containerID
|
||||||
|
+ "]";
|
||||||
|
error = KvCheckError.DB_ACCESS;
|
||||||
|
handleCorruption(dbFileErrorMessage, error, e);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
onDiskContainerData.setDbFile(dbFile);
|
||||||
|
|
||||||
|
try {
|
||||||
|
MetadataStore db = BlockUtils
|
||||||
|
.getDB(onDiskContainerData, checkConfig);
|
||||||
|
error = iterateBlockDB(db);
|
||||||
|
} catch (Exception e) {
|
||||||
|
error = KvCheckError.DB_ITERATOR;
|
||||||
|
handleCorruption("Block DB Iterator aborted", error, e);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KvCheckError iterateBlockDB(MetadataStore db)
|
||||||
|
throws IOException {
|
||||||
|
KvCheckError error = KvCheckError.ERROR_NONE;
|
||||||
|
Preconditions.checkState(db != null);
|
||||||
|
|
||||||
|
// get "normal" keys from the Block DB
|
||||||
|
KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
|
||||||
|
new File(onDiskContainerData.getContainerPath()));
|
||||||
|
|
||||||
|
// ensure there is a chunk file for each key in the DB
|
||||||
|
while (kvIter.hasNext()) {
|
||||||
|
BlockData block = kvIter.nextBlock();
|
||||||
|
|
||||||
|
List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
|
||||||
|
for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
|
||||||
|
File chunkFile;
|
||||||
|
try {
|
||||||
|
chunkFile = ChunkUtils
|
||||||
|
.getChunkFile(onDiskContainerData,
|
||||||
|
ChunkInfo.getFromProtoBuf(chunk));
|
||||||
|
} catch (Exception e) {
|
||||||
|
error = KvCheckError.MISSING_CHUNK_FILE;
|
||||||
|
handleCorruption("Unable to access chunk path", error, e);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!chunkFile.exists()) {
|
||||||
|
error = KvCheckError.MISSING_CHUNK_FILE;
|
||||||
|
|
||||||
|
// concurrent mutation in Block DB? lookup the block again.
|
||||||
|
byte[] bdata = db.get(
|
||||||
|
Longs.toByteArray(block.getBlockID().getLocalID()));
|
||||||
|
if (bdata == null) {
|
||||||
|
LOG.trace("concurrency with delete, ignoring deleted block");
|
||||||
|
error = KvCheckError.ERROR_NONE;
|
||||||
|
break; // skip to next block from kvIter
|
||||||
|
} else {
|
||||||
|
handleCorruption("Missing chunk file", error, null);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KvCheckError loadContainerData() {
|
||||||
|
KvCheckError error = KvCheckError.ERROR_NONE;
|
||||||
|
|
||||||
|
File containerFile = KeyValueContainer
|
||||||
|
.getContainerFile(metadataPath.toString(), containerID);
|
||||||
|
|
||||||
|
try {
|
||||||
|
onDiskContainerData = (KeyValueContainerData) ContainerDataYaml
|
||||||
|
.readContainerFile(containerFile);
|
||||||
|
} catch (IOException e) {
|
||||||
|
error = KvCheckError.FILE_LOAD;
|
||||||
|
handleCorruption("Unable to load Container File", error, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KvCheckAction handleCorruption(String reason,
|
||||||
|
KvCheckError error, Exception e) {
|
||||||
|
|
||||||
|
// XXX HDDS-1201 need to implement corruption handling/reporting
|
||||||
|
|
||||||
|
String errStr =
|
||||||
|
"Corruption detected in container: [" + containerID + "] reason: ["
|
||||||
|
+ reason + "] error code: [" + error + "]";
|
||||||
|
String logMessage = null;
|
||||||
|
|
||||||
|
StackTraceElement[] stackeElems = Thread.currentThread().getStackTrace();
|
||||||
|
String caller =
|
||||||
|
"Corruption reported from Source File: [" + stackeElems[2].getFileName()
|
||||||
|
+ "] Line: [" + stackeElems[2].getLineNumber() + "]";
|
||||||
|
|
||||||
|
if (e != null) {
|
||||||
|
logMessage = errStr + " exception: [" + e.getMessage() + "]";
|
||||||
|
e.printStackTrace();
|
||||||
|
} else {
|
||||||
|
logMessage = errStr;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.error(caller);
|
||||||
|
LOG.error(logMessage);
|
||||||
|
|
||||||
|
return KvCheckAction.ABORT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pre-defined error codes for Container Metadata check.
|
||||||
|
*/
|
||||||
|
public enum KvCheckError {
|
||||||
|
ERROR_NONE,
|
||||||
|
FILE_LOAD, // unable to load container metafile
|
||||||
|
METADATA_PATH_ACCESS, // metadata path is not accessible
|
||||||
|
CHUNKS_PATH_ACCESS, // chunks path is not accessible
|
||||||
|
CONTAINERDATA_ID, // bad Container-ID stored in Container file
|
||||||
|
CONTAINERDATA_METADATA_PATH, // bad metadata path in Container file
|
||||||
|
CONTAINERDATA_CHUNKS_PATH, // bad chunks path in Container file
|
||||||
|
CONTAINERDATA_CKSUM, // container file checksum mismatch
|
||||||
|
CONTAINERDATA_TYPE, // container file incorrect type of Container
|
||||||
|
CONTAINERDATA_DBTYPE, // unknown DB Type specified in Container File
|
||||||
|
DB_ACCESS, // unable to load Metastore DB
|
||||||
|
DB_ITERATOR, // unable to create block iterator for Metastore DB
|
||||||
|
MISSING_CHUNK_FILE // chunk file not found
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum KvCheckAction {
|
||||||
|
CONTINUE, // Continue with remaining checks on the corrupt Container
|
||||||
|
ABORT // Abort checks for the container
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,158 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.net.ntp.TimeStamp;
|
||||||
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||||
|
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Background Metadata scrubbing for Ozone Containers.
|
||||||
|
* Future scope : data(chunks) checksum verification.
|
||||||
|
*/
|
||||||
|
public class ContainerScrubber implements Runnable {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ContainerScrubber.class);
|
||||||
|
private final ContainerSet containerSet;
|
||||||
|
private final OzoneConfiguration config;
|
||||||
|
private final long timePerContainer = 10000; // 10 sec in millis
|
||||||
|
private boolean halt;
|
||||||
|
private Thread scrubThread;
|
||||||
|
|
||||||
|
public ContainerScrubber(ContainerSet cSet, OzoneConfiguration conf) {
|
||||||
|
Preconditions.checkNotNull(cSet,
|
||||||
|
"ContainerScrubber received a null ContainerSet");
|
||||||
|
Preconditions.checkNotNull(conf);
|
||||||
|
this.containerSet = cSet;
|
||||||
|
this.config = conf;
|
||||||
|
this.halt = false;
|
||||||
|
this.scrubThread = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void run() {
|
||||||
|
/**
|
||||||
|
* the outer daemon loop exits on down()
|
||||||
|
*/
|
||||||
|
LOG.info("Background ContainerScrubber starting up");
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
scrub();
|
||||||
|
|
||||||
|
if (this.halt) {
|
||||||
|
break; // stop and exit if requested
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(300000); /* 5 min between scans */
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.info("Background ContainerScrubber interrupted. Going to exit");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the scrub scanner thread.
|
||||||
|
*/
|
||||||
|
public void up() {
|
||||||
|
|
||||||
|
this.halt = false;
|
||||||
|
if (this.scrubThread == null) {
|
||||||
|
this.scrubThread = new Thread(this);
|
||||||
|
scrubThread.start();
|
||||||
|
} else {
|
||||||
|
LOG.info("Scrubber up called multiple times. Scrub thread already up.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the scrub scanner thread. Wait for thread to exit
|
||||||
|
*/
|
||||||
|
public void down() {
|
||||||
|
|
||||||
|
this.halt = true;
|
||||||
|
if (scrubThread == null) {
|
||||||
|
LOG.info("Scrubber down invoked, but scrub thread is not running");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.scrubThread.interrupt();
|
||||||
|
try {
|
||||||
|
this.scrubThread.join();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Exception when waiting for Container Scrubber thread ", e);
|
||||||
|
} finally {
|
||||||
|
this.scrubThread = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Current implementation : fixed rate scrub, no feedback loop.
|
||||||
|
* Dynamic throttling based on system load monitoring to be
|
||||||
|
* implemented later as jira [XXX]
|
||||||
|
*
|
||||||
|
* @param startTime
|
||||||
|
*/
|
||||||
|
private void throttleScrubber(TimeStamp startTime) {
|
||||||
|
TimeStamp endTime = new TimeStamp(System.currentTimeMillis());
|
||||||
|
long timeTaken = endTime.getTime() - startTime.getTime();
|
||||||
|
|
||||||
|
if (timeTaken < timePerContainer) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(timePerContainer - timeTaken);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.debug("Ignoring interrupted sleep inside throttle");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scrub() {
|
||||||
|
|
||||||
|
Iterator<Container> containerIt = containerSet.getContainerIterator();
|
||||||
|
long count = 0;
|
||||||
|
|
||||||
|
while (containerIt.hasNext()) {
|
||||||
|
TimeStamp startTime = new TimeStamp(System.currentTimeMillis());
|
||||||
|
Container container = containerIt.next();
|
||||||
|
|
||||||
|
if (this.halt) {
|
||||||
|
break; // stop if requested
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
container.check();
|
||||||
|
} catch (StorageContainerException e) {
|
||||||
|
LOG.error("Error unexpected exception {} for Container {}", e,
|
||||||
|
container.getContainerData().getContainerID());
|
||||||
|
// XXX Action required here
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
|
||||||
|
throttleScrubber(startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("iterator ran integrity checks on {} containers", count);
|
||||||
|
}
|
||||||
|
}
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto
|
import org.apache.hadoop.hdds.protocol.datanode.proto
|
||||||
@ -69,6 +70,7 @@ public class OzoneContainer {
|
|||||||
private final XceiverServerSpi writeChannel;
|
private final XceiverServerSpi writeChannel;
|
||||||
private final XceiverServerSpi readChannel;
|
private final XceiverServerSpi readChannel;
|
||||||
private final ContainerController controller;
|
private final ContainerController controller;
|
||||||
|
private ContainerScrubber scrubber;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct OzoneContainer object.
|
* Construct OzoneContainer object.
|
||||||
@ -82,6 +84,8 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
|
|||||||
this.config = conf;
|
this.config = conf;
|
||||||
this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
|
this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
|
||||||
this.containerSet = new ContainerSet();
|
this.containerSet = new ContainerSet();
|
||||||
|
this.scrubber = null;
|
||||||
|
|
||||||
buildContainerSet();
|
buildContainerSet();
|
||||||
final ContainerMetrics metrics = ContainerMetrics.create(conf);
|
final ContainerMetrics metrics = ContainerMetrics.create(conf);
|
||||||
this.handlers = Maps.newHashMap();
|
this.handlers = Maps.newHashMap();
|
||||||
@ -139,6 +143,34 @@ private void buildContainerSet() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start background daemon thread for performing container integrity checks.
|
||||||
|
*/
|
||||||
|
private void startContainerScrub() {
|
||||||
|
boolean enabled = config.getBoolean(
|
||||||
|
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED,
|
||||||
|
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT);
|
||||||
|
|
||||||
|
if (!enabled) {
|
||||||
|
LOG.info("Background container scrubber has been disabled by {}",
|
||||||
|
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED);
|
||||||
|
} else {
|
||||||
|
this.scrubber = new ContainerScrubber(containerSet, config);
|
||||||
|
scrubber.up();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the scanner thread and wait for thread to die.
|
||||||
|
*/
|
||||||
|
private void stopContainerScrub() {
|
||||||
|
if (scrubber == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
scrubber.down();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts serving requests to ozone container.
|
* Starts serving requests to ozone container.
|
||||||
*
|
*
|
||||||
@ -146,6 +178,7 @@ private void buildContainerSet() {
|
|||||||
*/
|
*/
|
||||||
public void start() throws IOException {
|
public void start() throws IOException {
|
||||||
LOG.info("Attempting to start container services.");
|
LOG.info("Attempting to start container services.");
|
||||||
|
startContainerScrub();
|
||||||
writeChannel.start();
|
writeChannel.start();
|
||||||
readChannel.start();
|
readChannel.start();
|
||||||
hddsDispatcher.init();
|
hddsDispatcher.init();
|
||||||
@ -157,6 +190,7 @@ public void start() throws IOException {
|
|||||||
public void stop() {
|
public void stop() {
|
||||||
//TODO: at end of container IO integration work.
|
//TODO: at end of container IO integration work.
|
||||||
LOG.info("Attempting to stop container services.");
|
LOG.info("Attempting to stop container services.");
|
||||||
|
stopContainerScrub();
|
||||||
writeChannel.stop();
|
writeChannel.stop();
|
||||||
readChannel.stop();
|
readChannel.stop();
|
||||||
hddsDispatcher.shutdown();
|
hddsDispatcher.shutdown();
|
||||||
|
@ -0,0 +1,194 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.keyvalue;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.StorageUnit;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.hdds.client.BlockID;
|
||||||
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
|
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
|
||||||
|
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
|
||||||
|
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Basic sanity test for the KeyValueContainerCheck class.
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class) public class TestKeyValueContainerCheck {
|
||||||
|
private final String storeImpl;
|
||||||
|
private KeyValueContainer container;
|
||||||
|
private KeyValueContainerData containerData;
|
||||||
|
private ChunkManagerImpl chunkManager;
|
||||||
|
private VolumeSet volumeSet;
|
||||||
|
private Configuration conf;
|
||||||
|
private File testRoot;
|
||||||
|
|
||||||
|
public TestKeyValueContainerCheck(String metadataImpl) {
|
||||||
|
this.storeImpl = metadataImpl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters public static Collection<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {{OZONE_METADATA_STORE_IMPL_LEVELDB},
|
||||||
|
{OZONE_METADATA_STORE_IMPL_ROCKSDB}});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before public void setUp() throws Exception {
|
||||||
|
this.testRoot = GenericTestUtils.getRandomizedTestDir();
|
||||||
|
conf = new OzoneConfiguration();
|
||||||
|
conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
|
||||||
|
conf.set(OZONE_METADATA_STORE_IMPL, storeImpl);
|
||||||
|
volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After public void teardown() {
|
||||||
|
volumeSet.shutdown();
|
||||||
|
FileUtil.fullyDelete(testRoot);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sanity test, when there are no corruptions induced.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test public void testKeyValueContainerCheckNoCorruption() throws Exception {
|
||||||
|
long containerID = 101;
|
||||||
|
int deletedBlocks = 1;
|
||||||
|
int normalBlocks = 3;
|
||||||
|
int chunksPerBlock = 4;
|
||||||
|
KeyValueContainerCheck.KvCheckError error;
|
||||||
|
|
||||||
|
// test Closed Container
|
||||||
|
createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536,
|
||||||
|
chunksPerBlock);
|
||||||
|
File chunksPath = new File(containerData.getChunksPath());
|
||||||
|
assertTrue(chunksPath.listFiles().length
|
||||||
|
== (deletedBlocks + normalBlocks) * chunksPerBlock);
|
||||||
|
|
||||||
|
KeyValueContainerCheck kvCheck =
|
||||||
|
new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
|
||||||
|
containerID, containerData);
|
||||||
|
|
||||||
|
// first run checks on a Open Container
|
||||||
|
error = kvCheck.fastCheck();
|
||||||
|
assertTrue(error == KeyValueContainerCheck.KvCheckError.ERROR_NONE);
|
||||||
|
|
||||||
|
container.close();
|
||||||
|
|
||||||
|
// next run checks on a Closed Container
|
||||||
|
error = kvCheck.fullCheck();
|
||||||
|
assertTrue(error == KeyValueContainerCheck.KvCheckError.ERROR_NONE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a container with normal and deleted blocks.
|
||||||
|
* First it will insert normal blocks, and then it will insert
|
||||||
|
* deleted blocks.
|
||||||
|
* @param containerId
|
||||||
|
* @param normalBlocks
|
||||||
|
* @param deletedBlocks
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private void createContainerWithBlocks(long containerId, int normalBlocks,
|
||||||
|
int deletedBlocks, long chunkLen, int chunksPerBlock) throws Exception {
|
||||||
|
long chunkCount;
|
||||||
|
String strBlock = "block";
|
||||||
|
String strChunk = "-chunkFile";
|
||||||
|
byte[] chunkData = new byte[(int) chunkLen];
|
||||||
|
|
||||||
|
containerData = new KeyValueContainerData(containerId,
|
||||||
|
(long) StorageUnit.MB.toBytes(chunksPerBlock * chunkLen),
|
||||||
|
UUID.randomUUID().toString(), UUID.randomUUID().toString());
|
||||||
|
container = new KeyValueContainer(containerData, conf);
|
||||||
|
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
|
||||||
|
UUID.randomUUID().toString());
|
||||||
|
MetadataStore metadataStore = BlockUtils.getDB(containerData, conf);
|
||||||
|
chunkManager = new ChunkManagerImpl(true);
|
||||||
|
|
||||||
|
assertTrue(containerData.getChunksPath() != null);
|
||||||
|
File chunksPath = new File(containerData.getChunksPath());
|
||||||
|
assertTrue(chunksPath.exists());
|
||||||
|
// Initially chunks folder should be empty.
|
||||||
|
assertTrue(chunksPath.listFiles().length == 0);
|
||||||
|
|
||||||
|
List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
|
||||||
|
for (int i = 0; i < (normalBlocks + deletedBlocks); i++) {
|
||||||
|
BlockID blockID = new BlockID(containerId, i);
|
||||||
|
BlockData blockData = new BlockData(blockID);
|
||||||
|
|
||||||
|
chunkList.clear();
|
||||||
|
for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
|
||||||
|
String chunkName = strBlock + i + strChunk + chunkCount;
|
||||||
|
long offset = chunkCount * chunkLen;
|
||||||
|
ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
|
||||||
|
chunkList.add(info.getProtoBufMessage());
|
||||||
|
chunkManager
|
||||||
|
.writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
|
||||||
|
new DispatcherContext.Builder()
|
||||||
|
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
|
||||||
|
.build());
|
||||||
|
chunkManager
|
||||||
|
.writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
|
||||||
|
new DispatcherContext.Builder()
|
||||||
|
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
blockData.setChunks(chunkList);
|
||||||
|
|
||||||
|
if (i >= normalBlocks) {
|
||||||
|
// deleted key
|
||||||
|
metadataStore.put(DFSUtil.string2Bytes(
|
||||||
|
OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()),
|
||||||
|
blockData.getProtoBufMessage().toByteArray());
|
||||||
|
} else {
|
||||||
|
// normal key
|
||||||
|
metadataStore.put(Longs.toByteArray(blockID.getLocalID()),
|
||||||
|
blockData.getProtoBufMessage().toByteArray());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user