diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index d5487b3602..c0e7d7847c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -24,12 +24,15 @@ import java.util.List; import java.util.Map; +import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; @@ -193,6 +196,10 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData, }).sum(); kvContainerData.setBytesUsed(bytesUsed); kvContainerData.setKeyCount(liveKeys.size()); + byte[] bcsId = metadata.getStore().get(DFSUtil.string2Bytes( + OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX)); + Preconditions.checkNotNull(bcsId); + kvContainerData.updateBlockCommitSequenceId(Longs.fromByteArray(bcsId)); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java new file mode 100644 index 0000000000..e5a3d2f2e7 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; + +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests delete key operation with a slow follower in the datanode + * pipeline. + */ +public class TestContainerReplicationEndToEnd { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + private static String volumeName; + private static String bucketName; + private static String path; + private static XceiverClientManager xceiverClientManager; + private static long containerReportInterval; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + path = GenericTestUtils + .getTempPath(TestContainerStateMachineFailures.class.getSimpleName()); + File baseDir = new File(path); + baseDir.mkdirs(); + containerReportInterval = 2000; + + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, + containerReportInterval, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, containerReportInterval, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL, + 2 * containerReportInterval, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1000, + TimeUnit.SECONDS); + conf.setTimeDuration(OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY, + 1000, TimeUnit.SECONDS); + conf.setLong("hdds.scm.replication.thread.interval", + containerReportInterval); + + conf.setQuietMode(false); + cluster = + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200) + .build(); + cluster.waitForClusterToBeReady(); + cluster.getStorageContainerManager().getReplicationManager().start(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + xceiverClientManager = new XceiverClientManager(conf); + volumeName = "testcontainerstatemachinefailures"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * The test simulates end to end container replication. + */ + @Test + public void testContainerReplication() throws Exception { + String keyName = "testContainerReplication"; + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey(keyName, 0, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + byte[] testData = "ratis".getBytes(); + // First write and flush creates a container in the datanode + key.write(testData); + key.flush(); + + KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + long containerID = omKeyLocationInfo.getContainerID(); + PipelineID pipelineID = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(new ContainerID(containerID)).getPipelineID(); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(pipelineID); + key.close(); + + cluster.getStorageContainerManager().getContainerManager() + .updateContainerState(new ContainerID(containerID), + HddsProtos.LifeCycleEvent.FINALIZE); + // wait for container to move to OPEN state in SCM + Thread.sleep(2 * containerReportInterval); + DatanodeDetails oldReplicaNode = pipeline.getFirstNode(); + // now move the container to the closed on the datanode. + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setContainerID(containerID); + request.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + xceiverClient.sendCommand(request.build()); + // wait for container to move to closed state in SCM + Thread.sleep(2 * containerReportInterval); + Assert.assertTrue( + cluster.getStorageContainerManager().getContainerInfo(containerID) + .getState() == HddsProtos.LifeCycleState.CLOSED); + // shutdown the replica node + cluster.shutdownHddsDatanode(oldReplicaNode); + // now the container is under replicated and will be moved to a different dn + HddsDatanodeService dnService = null; + + for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { + Predicate p = + i -> i.getUuid().equals(dn.getDatanodeDetails().getUuid()); + if (!pipeline.getNodes().stream().anyMatch(p)) { + dnService = dn; + } + } + + Assert.assertNotNull(dnService); + final HddsDatanodeService newReplicaNode = dnService; + // wait for the container to get replicated + GenericTestUtils.waitFor(() -> { + return newReplicaNode.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID) != null; + }, 500, 100000); + Assert.assertTrue(newReplicaNode.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID).getContainerData() + .getBlockCommitSequenceId() > 0); + // wait for SCM to update the replica Map + Thread.sleep(5 * containerReportInterval); + // now shutdown the other two dns of the original pipeline and try reading + // the key again + for (DatanodeDetails dn : pipeline.getNodes()) { + cluster.shutdownHddsDatanode(dn); + } + // This will try to read the data from the dn to which the container got + // replicated after the container got closed. + ContainerTestHelper + .validateData(keyName, testData, objectStore, volumeName, bucketName); + } +}