diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 4c87b1950f..da77f1c5cd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -241,9 +241,12 @@ public class KeyValueHandler extends Handler { newContainer.create(volumeSet, volumeChoosingPolicy, scmID); containerSet.addContainer(newContainer); } else { - throw new StorageContainerException("Container already exists with " + - "container Id " + containerID, ContainerProtos.Result - .CONTAINER_EXISTS); + + // The create container request for an already existing container can + // arrive in case the ContainerStateMachine reapplies the transaction + // on datanode restart. Just log a warning msg here. + LOG.warn("Container already exists." + + "container Id " + containerID); } } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -370,6 +373,7 @@ public class KeyValueHandler extends Handler { /** * Handles Close Container Request. An open container is closed. + * Close Container call is idempotent. */ ContainerCommandResponseProto handleCloseContainer( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 8bdae0f1fa..20598d9ec6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -231,21 +231,18 @@ public final class ChunkUtils { * * @param chunkFile - chunkFile to write data into. * @param info - chunk info. - * @return boolean isOverwrite - * @throws StorageContainerException + * @return true if the chunkFile exists and chunkOffset < chunkFile length, + * false otherwise. */ public static boolean validateChunkForOverwrite(File chunkFile, - ChunkInfo info) throws StorageContainerException { + ChunkInfo info) { Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); if (isOverWriteRequested(chunkFile, info)) { if (!isOverWritePermitted(info)) { - log.error("Rejecting write chunk request. Chunk overwrite " + + log.warn("Duplicate write chunk request. Chunk overwrite " + "without explicit request. {}", info.toString()); - throw new StorageContainerException("Rejecting write chunk request. " + - "OverWrite flag required." + info.toString(), - OVERWRITE_FLAG_REQUIRED); } return true; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index bd19441266..7fa0cfb92e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -89,11 +89,33 @@ public class BlockManagerImpl implements BlockManager { Preconditions.checkNotNull(db, "DB cannot be null here"); long blockCommitSequenceId = data.getBlockCommitSequenceId(); + byte[] blockCommitSequenceIdKey = + DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX); + byte[] blockCommitSequenceIdValue = db.get(blockCommitSequenceIdKey); + + // default blockCommitSequenceId for any block is 0. It the putBlock + // request is not coming via Ratis(for test scenarios), it will be 0. + // In such cases, we should overwrite the block as well + if (blockCommitSequenceIdValue != null && blockCommitSequenceId != 0) { + if (blockCommitSequenceId <= Longs + .fromByteArray(blockCommitSequenceIdValue)) { + // Since the blockCommitSequenceId stored in the db is greater than + // equal to blockCommitSequenceId to be updated, it means the putBlock + // transaction is reapplied in the ContainerStateMachine on restart. + // It also implies that the given block must already exist in the db. + // just log and return + LOG.warn("blockCommitSequenceId " + Longs + .fromByteArray(blockCommitSequenceIdValue) + + " in the Container Db is greater than" + " the supplied value " + + blockCommitSequenceId + " .Ignoring it"); + return data.getSize(); + } + } // update the blockData as well as BlockCommitSequenceId here BatchOperation batch = new BatchOperation(); batch.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage().toByteArray()); - batch.put(DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX), + batch.put(blockCommitSequenceIdKey, Longs.toByteArray(blockCommitSequenceId)); db.writeBatch(batch); container.updateBlockCommitSequenceId(blockCommitSequenceId); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java index ce317bd3ed..6fd8d5f589 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java @@ -87,6 +87,32 @@ public class ChunkManagerImpl implements ChunkManager { switch (stage) { case WRITE_DATA: + if (isOverwrite) { + // if the actual chunk file already exists here while writing the temp + // chunk file, then it means the same ozone client request has + // generated two raft log entries. This can happen either because + // retryCache expired in Ratis (or log index mismatch/corruption in + // Ratis). This can be solved by two approaches as of now: + // 1. Read the complete data in the actual chunk file , + // verify the data integrity and in case it mismatches , either + // 2. Delete the chunk File and write the chunk again. For now, + // let's rewrite the chunk file + // TODO: once the checksum support for write chunks gets plugged in, + // the checksum needs to be verified for the actual chunk file and + // the data to be written here which should be efficient and + // it matches we can safely return without rewriting. + LOG.warn("ChunkFile already exists" + chunkFile + ".Deleting it."); + FileUtil.fullyDelete(chunkFile); + } + if (tmpChunkFile.exists()) { + // If the tmp chunk file already exists it means the raft log got + // appended, but later on the log entry got truncated in Ratis leaving + // behind garbage. + // TODO: once the checksum support for data chunks gets plugged in, + // instead of rewriting the chunk here, let's compare the checkSums + LOG.warn( + "tmpChunkFile already exists" + tmpChunkFile + "Overwriting it."); + } // Initially writes to temporary chunk file. ChunkUtils.writeData(tmpChunkFile, info, data, volumeIOStats); // No need to increment container stats here, as still data is not @@ -95,6 +121,15 @@ public class ChunkManagerImpl implements ChunkManager { case COMMIT_DATA: // commit the data, means move chunk data from temporary chunk file // to actual chunk file. + if (isOverwrite) { + // if the actual chunk file already exists , it implies the write + // chunk transaction in the containerStateMachine is getting + // reapplied. This can happen when a node restarts. + // TODO: verify the checkSums for the existing chunkFile and the + // chunkInfo to be committed here + LOG.warn("ChunkFile already exists" + chunkFile); + return; + } commitChunk(tmpChunkFile, chunkFile); // Increment container stats here, as we commit the data. containerData.incrBytesUsed(info.getLen()); @@ -200,6 +235,14 @@ public class ChunkManagerImpl implements ChunkManager { if (containerData.getLayOutVersion() == ChunkLayOutVersion .getLatestVersion().getVersion()) { File chunkFile = ChunkUtils.getChunkFile(containerData, info); + + // if the chunk file does not exist, it might have already been deleted. + // The call might be because of reapply of transactions on datanode + // restart. + if (!chunkFile.exists()) { + LOG.warn("Chunk file doe not exist. chunk info :" + info.toString()); + return; + } if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) { FileUtil.fullyDelete(chunkFile); containerData.decrBytesUsed(chunkFile.length()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java new file mode 100644 index 0000000000..c69a94c2c4 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.
+ ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.
+ SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.protocolPB.
+ StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Tests the idempotent operations in ContainerStateMachine.
+ */
+public class TestContainerStateMachineIdempotency {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration ozoneConfig;
+ private static StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient;
+ private static XceiverClientManager xceiverClientManager;
+ private static String containerOwner = "OZONE";
+
+ @BeforeClass
+ public static void init() throws Exception {
+ ozoneConfig = new OzoneConfiguration();
+ ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ cluster =
+ MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+ cluster.waitForClusterToBeReady();
+ storageContainerLocationClient =
+ cluster.getStorageContainerLocationClient();
+ xceiverClientManager = new XceiverClientManager(ozoneConfig);
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
+ }
+
+ @Test
+ public void testContainerStateMachineIdempotency() throws Exception {
+ String traceID = UUID.randomUUID().toString();
+ ContainerWithPipeline container = storageContainerLocationClient
+ .allocateContainer(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, containerOwner);
+ long containerID = container.getContainerInfo().getContainerID();
+ Pipeline pipeline = container.getPipeline();
+ XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+ try {
+ //create the container
+ ContainerProtocolCalls.createContainer(client, containerID, traceID);
+ // call create Container again
+ BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+ byte[] data =
+ RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ ContainerTestHelper
+ .getWriteChunkRequest(container.getPipeline(), blockID,
+ data.length);
+ client.sendCommand(writeChunkRequest);
+
+ //Make the write chunk request again without requesting for overWrite
+ client.sendCommand(writeChunkRequest);
+ // Now, explicitly make a putKey request for the block.
+ ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+ ContainerTestHelper
+ .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
+ client.sendCommand(putKeyRequest).getPutBlock();
+ // send the putBlock again
+ client.sendCommand(putKeyRequest);
+
+ // close container call
+ ContainerProtocolCalls.closeContainer(client, containerID, traceID);
+ ContainerProtocolCalls.closeContainer(client, containerID, traceID);
+ } catch (IOException ioe) {
+ Assert.fail("Container operation failed" + ioe);
+ }
+ xceiverClientManager.releaseClient(client);
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index d9e0d297f8..bea00fecaf 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -462,15 +462,7 @@ public class TestContainerPersistence {
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
- try {
- chunkManager.writeChunk(container, blockID, info, data, COMBINED);
- } catch (StorageContainerException ex) {
- Assert.assertTrue(ex.getMessage().contains(
- "Rejecting write chunk request. OverWrite flag required"));
- Assert.assertEquals(ex.getResult(),
- ContainerProtos.Result.OVERWRITE_FLAG_REQUIRED);
- }
-
+ chunkManager.writeChunk(container, blockID, info, data, COMBINED);
// With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
@@ -478,7 +470,7 @@ public class TestContainerPersistence {
Assert.assertEquals(datalen, bytesUsed);
long bytesWrite = container.getContainerData().getWriteBytes();
- Assert.assertEquals(datalen * 2, bytesWrite);
+ Assert.assertEquals(datalen * 3, bytesWrite);
}
/**
@@ -748,10 +740,11 @@ public class TestContainerPersistence {
}
- private BlockData writeBlockHelper(BlockID blockID)
+ private BlockData writeBlockHelper(BlockID blockID, int i)
throws IOException, NoSuchAlgorithmException {
ChunkInfo info = writeChunkHelper(blockID);
BlockData blockData = new BlockData(blockID);
+ blockData.setBlockCommitSequenceId((long) i);
List