HDDS-1843. Undetectable corruption after restart of a datanode. Contributed by Shashikant Banerjee(#1364).

This commit is contained in:
Shashikant Banerjee 2019-09-09 22:43:20 +05:30
parent 147f98629c
commit 469165e6f2
13 changed files with 218 additions and 61 deletions

View File

@ -248,8 +248,9 @@ message ContainerDataProto {
optional ContainerType containerType = 10 [default = KeyValueContainer];
}
message ContainerIdSetProto {
repeated int64 containerId = 1;
message Container2BCSIDMapProto {
// repeated Container2BCSIDMapEntryProto container2BCSID = 1;
map <int64, int64> container2BCSID = 1;
}
enum ContainerType {

View File

@ -240,14 +240,46 @@ public Set<Long> getMissingContainerSet() {
}
/**
* Builds the missing container set by taking a diff total no containers
* actually found and number of containers which actually got created.
* Builds the missing container set by taking a diff between total no
* containers actually found and number of containers which actually
* got created. It also validates the BCSID stored in the snapshot file
* for each container as against what is reported in containerScan.
* This will only be called during the initialization of Datanode Service
* when it still not a part of any write Pipeline.
* @param createdContainerSet ContainerId set persisted in the Ratis snapshot
* @param container2BCSIDMap Map of containerId to BCSID persisted in the
* Ratis snapshot
*/
public void buildMissingContainerSet(Set<Long> createdContainerSet) {
missingContainerSet.addAll(createdContainerSet);
missingContainerSet.removeAll(containerMap.keySet());
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
container2BCSIDMap.entrySet().parallelStream().forEach((mapEntry) -> {
long id = mapEntry.getKey();
if (!containerMap.containsKey(id)) {
LOG.warn("Adding container {} to missing container set.", id);
missingContainerSet.add(id);
} else {
Container container = containerMap.get(id);
long containerBCSID = container.getBlockCommitSequenceId();
long snapshotBCSID = mapEntry.getValue();
if (containerBCSID < snapshotBCSID) {
LOG.warn(
"Marking container {} unhealthy as reported BCSID {} is smaller"
+ " than ratis snapshot recorded value {}", id,
containerBCSID, snapshotBCSID);
// just mark the container unhealthy. Once the DatanodeStateMachine
// thread starts it will send container report to SCM where these
// unhealthy containers would be detected
try {
container.markContainerUnhealthy();
} catch (StorageContainerException sce) {
// The container will still be marked unhealthy in memory even if
// exception occurs. It won't accept any new transactions and will
// be handled by SCM. Eve if dn restarts, it will still be detected
// as unheathy as its BCSID won't change.
LOG.error("Unable to persist unhealthy state for container {}", id);
}
}
}
});
}
}

View File

@ -135,8 +135,10 @@ private boolean canIgnoreException(Result result) {
}
@Override
public void buildMissingContainerSet(Set<Long> createdContainerSet) {
containerSet.buildMissingContainerSet(createdContainerSet);
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
containerSet
.buildMissingContainerSetAndValidate(container2BCSIDMap);
}
@Override
@ -185,9 +187,9 @@ private ContainerCommandResponseProto dispatchRequest(
cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext == null
|| dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMBINED);
Set<Long> containerIdSet = null;
Map<Long, Long> container2BCSIDMap = null;
if (dispatcherContext != null) {
containerIdSet = dispatcherContext.getCreateContainerSet();
container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
}
if (isWriteCommitStage) {
// check if the container Id exist in the loaded snapshot file. if
@ -196,9 +198,10 @@ private ContainerCommandResponseProto dispatchRequest(
// snapshot.
// just add it to the list, and remove it from missing container set
// as it might have been added in the list during "init".
Preconditions.checkNotNull(containerIdSet);
if (!containerIdSet.contains(containerID)) {
containerIdSet.add(containerID);
Preconditions.checkNotNull(container2BCSIDMap);
if (container2BCSIDMap.get(containerID) == null) {
container2BCSIDMap
.put(containerID, container.getBlockCommitSequenceId());
containerSet.getMissingContainerSet().remove(containerID);
}
}
@ -228,11 +231,14 @@ private ContainerCommandResponseProto dispatchRequest(
audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && containerIdSet != null
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
|| dispatcherContext == null);
if (containerIdSet != null) {
if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
containerIdSet.add(containerID);
// with initial BCSID recorded as 0.
Preconditions
.checkArgument(!container2BCSIDMap.containsKey(containerID));
container2BCSIDMap.put(containerID, Long.valueOf(0));
}
container = getContainer(containerID);
}
@ -313,7 +319,8 @@ private ContainerCommandResponseProto dispatchRequest(
sendCloseContainerActionIfNeeded(container);
}
if(result == Result.SUCCESS) {
if (result == Result.SUCCESS) {
updateBCSID(container, dispatcherContext, cmdType);
audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
} else {
audit(action, eventType, params, AuditEventStatus.FAILURE,
@ -329,6 +336,22 @@ private ContainerCommandResponseProto dispatchRequest(
}
}
private void updateBCSID(Container container,
DispatcherContext dispatcherContext, ContainerProtos.Type cmdType) {
if (dispatcherContext != null && (cmdType == ContainerProtos.Type.PutBlock
|| cmdType == ContainerProtos.Type.PutSmallFile)) {
Preconditions.checkNotNull(container);
long bcsID = container.getBlockCommitSequenceId();
long containerId = container.getContainerData().getContainerID();
Map<Long, Long> container2BCSIDMap;
container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
Preconditions.checkNotNull(container2BCSIDMap);
Preconditions.checkArgument(container2BCSIDMap.containsKey(containerId));
// updates the latest BCSID on every putBlock or putSmallFile
// transaction over Ratis.
container2BCSIDMap.computeIfPresent(containerId, (u, v) -> v = bcsID);
}
}
/**
* Create a container using the input container request.
* @param containerRequest - the container request which requires container

View File

@ -155,6 +155,13 @@ ContainerReplicaProto getContainerReport()
void updateBlockCommitSequenceId(long blockCommitSequenceId);
/**
* Returns the blockCommitSequenceId.
*/
long getBlockCommitSequenceId();
/**
* check and report the structural integrity of the container.
* @return true if the integrity checks pass
* Scan the container metadata to detect corruption.
*/
boolean scanMetaData();

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import java.util.Set;
import java.util.Map;
/**
* Dispatcher acts as the bridge between the transport layer and
@ -62,9 +62,9 @@ void validateContainerCommand(
/**
* finds and builds the missing containers in case of a lost disk etc
* in the ContainerSet.
* in the ContainerSet. It also validates the BCSID of the containers found.
*/
void buildMissingContainerSet(Set<Long> createdContainers);
void buildMissingContainerSetAndValidate(Map<Long, Long> container2BCSIDMap);
/**
* Shutdown Dispatcher services.

View File

@ -46,7 +46,7 @@
.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerIdSetProto;
Container2BCSIDMapProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -88,8 +88,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.io.FileOutputStream;
import java.io.FileInputStream;
@ -146,7 +144,7 @@ public class ContainerStateMachine extends BaseStateMachine {
CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
// keeps track of the containers created per pipeline
private final Set<Long> createContainerSet;
private final Map<Long, Long> container2BCSIDMap;
private ExecutorService[] executors;
private final Map<Long, Long> applyTransactionCompletionMap;
private final Cache<Long, ByteString> stateMachineDataCache;
@ -181,7 +179,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
.maximumSize(chunkExecutor.getCorePoolSize()).build();
this.isBlockTokenEnabled = isBlockTokenEnabled;
this.tokenVerifier = tokenVerifier;
this.createContainerSet = new ConcurrentSkipListSet<>();
this.container2BCSIDMap = new ConcurrentHashMap<>();
final int numContainerOpExecutors = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
@ -244,14 +242,15 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
// initialize the dispatcher with snapshot so that it build the missing
// container list
try (FileInputStream fin = new FileInputStream(snapshotFile)) {
byte[] containerIds = IOUtils.toByteArray(fin);
ContainerProtos.ContainerIdSetProto proto =
ContainerProtos.ContainerIdSetProto.parseFrom(containerIds);
byte[] container2BCSIDData = IOUtils.toByteArray(fin);
ContainerProtos.Container2BCSIDMapProto proto =
ContainerProtos.Container2BCSIDMapProto
.parseFrom(container2BCSIDData);
// read the created containers list from the snapshot file and add it to
// the createContainerSet here.
// createContainerSet will further grow as and when containers get created
createContainerSet.addAll(proto.getContainerIdList());
dispatcher.buildMissingContainerSet(createContainerSet);
// the container2BCSIDMap here.
// container2BCSIDMap will further grow as and when containers get created
container2BCSIDMap.putAll(proto.getContainer2BCSIDMap());
dispatcher.buildMissingContainerSetAndValidate(container2BCSIDMap);
}
return last.getIndex();
}
@ -263,8 +262,9 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
* @throws IOException
*/
public void persistContainerSet(OutputStream out) throws IOException {
ContainerIdSetProto.Builder builder = ContainerIdSetProto.newBuilder();
builder.addAllContainerId(createContainerSet);
Container2BCSIDMapProto.Builder builder =
Container2BCSIDMapProto.newBuilder();
builder.putAllContainer2BCSID(container2BCSIDMap);
// TODO : while snapshot is being taken, deleteContainer call should not
// should not happen. Lock protection will be required if delete
// container happens outside of Ratis.
@ -433,7 +433,7 @@ private CompletableFuture<Message> handleWriteChunk(
.setTerm(term)
.setLogIndex(entryIndex)
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setCreateContainerSet(createContainerSet)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
// ensure the write chunk happens asynchronously in writeChunkExecutor pool
// thread.
@ -697,8 +697,9 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
builder
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
}
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
builder.setCreateContainerSet(createContainerSet);
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
|| cmdType == Type.PutBlock) {
builder.setContainer2BCSIDMap(container2BCSIDMap);
}
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
@ -811,7 +812,7 @@ public void notifyGroupRemove() {
// Make best effort to quasi-close all the containers on group removal.
// Containers already in terminal state like CLOSED or UNHEALTHY will not
// be affected.
for (Long cid : createContainerSet) {
for (Long cid : container2BCSIDMap.keySet()) {
try {
containerController.markContainerForClose(cid);
containerController.quasiCloseContainer(cid);

View File

@ -20,7 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.Set;
import java.util.Map;
/**
* DispatcherContext class holds transport protocol specific context info
@ -45,15 +45,15 @@ public enum WriteChunkStage {
// the log index in Ratis log to which the request belongs to
private final long logIndex;
private final Set<Long> createContainerSet;
private final Map<Long, Long> container2BCSIDMap;
private DispatcherContext(long term, long index, WriteChunkStage stage,
boolean readFromTmpFile, Set<Long> containerSet) {
boolean readFromTmpFile, Map<Long, Long> container2BCSIDMap) {
this.term = term;
this.logIndex = index;
this.stage = stage;
this.readFromTmpFile = readFromTmpFile;
this.createContainerSet = containerSet;
this.container2BCSIDMap = container2BCSIDMap;
}
public long getLogIndex() {
@ -72,8 +72,8 @@ public WriteChunkStage getStage() {
return stage;
}
public Set<Long> getCreateContainerSet() {
return createContainerSet;
public Map<Long, Long> getContainer2BCSIDMap() {
return container2BCSIDMap;
}
/**
@ -84,7 +84,7 @@ public static final class Builder {
private boolean readFromTmpFile = false;
private long term;
private long logIndex;
private Set<Long> createContainerSet;
private Map<Long, Long> container2BCSIDMap;
/**
* Sets the WriteChunkStage.
@ -131,13 +131,13 @@ public Builder setLogIndex(long index) {
}
/**
* Sets the createContainerSet to contain all the containerIds per
* Sets the container2BCSIDMap to contain all the containerIds per
* RaftGroup.
* @param set createContainerSet
* @param map container2BCSIDMap
* @return Builder
*/
public Builder setCreateContainerSet(Set<Long> set) {
this.createContainerSet = set;
public Builder setContainer2BCSIDMap(Map<Long, Long> map) {
this.container2BCSIDMap = map;
return this;
}
/**
@ -147,7 +147,7 @@ public Builder setCreateContainerSet(Set<Long> set) {
*/
public DispatcherContext build() {
return new DispatcherContext(term, logIndex, stage, readFromTmpFile,
createContainerSet);
container2BCSIDMap);
}
}

View File

@ -611,6 +611,11 @@ public void updateBlockCommitSequenceId(long blockCommitSequenceId) {
containerData.updateBlockCommitSequenceId(blockCommitSequenceId);
}
@Override
public long getBlockCommitSequenceId() {
return containerData.getBlockCommitSequenceId();
}
/**
* Returns KeyValueContainerReport for the KeyValueContainer.

View File

@ -282,5 +282,4 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
public void shutdown() {
BlockUtils.shutdownCache(ContainerCache.getInstance(config));
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.client.rpc;
import com.google.common.primitives.Longs;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
@ -26,6 +27,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@ -39,7 +41,9 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@ -348,4 +352,88 @@ public void testApplyTransactionFailure() throws Exception {
FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
}
@Test
public void testValidateBCSIDOnDnRestart() throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
ContainerData containerData =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID())
.getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
KeyValueContainerData keyValueContainerData =
(KeyValueContainerData) containerData;
key.close();
long containerID = omKeyLocationInfo.getContainerID();
cluster.shutdownHddsDatanode(
cluster.getHddsDatanodes().get(0).getDatanodeDetails());
// delete the container db file
FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
cluster.restartHddsDatanode(
cluster.getHddsDatanodes().get(0).getDatanodeDetails(), true);
OzoneContainer ozoneContainer =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer();
// make sure the missing containerSet is not empty
HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
Assert
.assertTrue(dispatcher.getMissingContainerSet().contains(containerID));
// write a new key
key = objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis1".getBytes());
key.flush();
groupOutputStream = (KeyOutputStream) key.getOutputStream();
locationInfoList = groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
omKeyLocationInfo = locationInfoList.get(0);
key.close();
containerID = omKeyLocationInfo.getContainerID();
containerData = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
keyValueContainerData = (KeyValueContainerData) containerData;
ReferenceCountedDB db = BlockUtils.
getDB(keyValueContainerData, conf);
byte[] blockCommitSequenceIdKey =
DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
// modify the bcsid for the container in the ROCKS DB tereby inducing
// corruption
db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
db.decrementReference();
// shutdown of dn will take a snapsot which will persist the valid BCSID
// recorded in the container2BCSIDMap in ContainerStateMachine
cluster.shutdownHddsDatanode(
cluster.getHddsDatanodes().get(0).getDatanodeDetails());
// after the restart, there will be a mismatch in BCSID of what is recorded
// in the and what is there in RockSDB and hence the container would be
// marked unhealthy
cluster.restartHddsDatanode(
cluster.getHddsDatanodes().get(0).getDatanodeDetails(), true);
// Make sure the container is marked unhealthy
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet().getContainer(containerID)
.getContainerState()
== ContainerProtos.ContainerDataProto.State.UNHEALTHY);
}
}

View File

@ -58,7 +58,7 @@
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.util.function.CheckedBiConsumer;
import java.util.Set;
import java.util.Map;
import java.util.function.BiConsumer;
import org.junit.Test;
@ -230,7 +230,8 @@ public void setScmId(String scmId) {
}
@Override
public void buildMissingContainerSet(Set<Long> createdContainerSet) {
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
}
}
}

View File

@ -71,7 +71,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Set;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
@ -293,7 +292,8 @@ public void setScmId(String scmId) {
}
@Override
public void buildMissingContainerSet(Set<Long> createdContainerSet) {
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
}
}
}

View File

@ -71,7 +71,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
@ -284,8 +284,8 @@ public void setScmId(String scmId) {
}
@Override
public void buildMissingContainerSet(Set<Long> createdContainerSet) {
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
}
}
}