From 8ef2365ffd8fca888b23ad0e3afb7b0e09e3a5e0 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Thu, 18 Jul 2019 19:57:12 +0530 Subject: [PATCH] HDDS-1779. TestWatchForCommit tests are flaky.Contributed by Shashikant Banerjee. (#1071) --- .../client/rpc/Test2WayCommitInRatis.java | 156 ++++++++++++++++++ .../ozone/client/rpc/TestWatchForCommit.java | 100 ++++------- 2 files changed, 187 insertions(+), 69 deletions(-) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java new file mode 100644 index 0000000000..cf570d28f7 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; + +/** + * This class tests the 2 way commit in Ratis. + */ +public class Test2WayCommitInRatis { + + private MiniOzoneCluster cluster; + private OzoneClient client; + private ObjectStore objectStore; + private String volumeName; + private String bucketName; + private int chunkSize; + private int flushSize; + private int maxFlushSize; + private int blockSize; + private StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static String containerOwner = "OZONE"; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + private void startCluster(OzoneConfiguration conf) throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, + 1, TimeUnit.SECONDS); + + conf.setQuietMode(false); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + volumeName = "watchforcommithandlingtest"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + storageContainerLocationClient = cluster + .getStorageContainerLocationClient(); + } + + + /** + * Shutdown MiniDFSCluster. + */ + private void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + + @Test + public void test2WayCommitForRetryfailure() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi xceiverClient = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + xceiverClient.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + reply = xceiverClient.sendCommandAsync(ContainerTestHelper + .getCloseContainer(pipeline, + container1.getContainerInfo().getContainerID())); + reply.getResponse().get(); + xceiverClient.watchForCommit(reply.getLogIndex(), 20000); + + // commitInfo Map will be reduced to 2 here + Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); + clientManager.releaseClient(xceiverClient, false); + Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); + Assert + .assertTrue(logCapturer.getOutput().contains("Committed by majority")); + logCapturer.stopCapturing(); + shutdown(); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index 90292eae09..9b59349119 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -46,7 +46,9 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -135,7 +137,10 @@ public class TestWatchForCommit { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + 1, TimeUnit.SECONDS); startCluster(conf); XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -178,31 +183,24 @@ public class TestWatchForCommit { .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size - Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); // writtenDataLength as well flushedDataLength will be updated here Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(maxFlushSize, blockOutputStream.getTotalDataFlushedLength()); - // since data equals to maxBufferSize is written, this will be a blocking // call and hence will wait for atleast flushSize worth of data to get // acked by all servers right here Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); - // watchForCommit will clean up atleast one entry from the map where each // entry corresponds to flushSize worth of data Assert.assertTrue( blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); - // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); - Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -213,19 +211,15 @@ public class TestWatchForCommit { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); - // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures - Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength()); // flush will make sure one more entry gets updated in the map Assert.assertTrue( blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); - XceiverClientRatis raftClient = (XceiverClientRatis) blockOutputStream.getXceiverClient(); Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); @@ -235,11 +229,9 @@ public class TestWatchForCommit { // again write data with more than max buffer limit. This will call // watchForCommit again. Since the commit will happen 2 way, the // commitInfoMap will get updated for servers which are alive - // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here // once exception is hit key.write(data1); - // As a part of handling the exception, 4 failed writeChunks will be // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk @@ -282,7 +274,7 @@ public class TestWatchForCommit { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); startCluster(conf); XceiverClientManager clientManager = new XceiverClientManager(conf); ContainerWithPipeline container1 = storageContainerLocationClient @@ -303,8 +295,11 @@ public class TestWatchForCommit { cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); try { - // just watch for a lo index which in not updated in the commitInfo Map - xceiverClient.watchForCommit(index + 1, 3000); + // just watch for a log index which in not updated in the commitInfo Map + // as well as there is no logIndex generate in Ratis. + // The basic idea here is just to test if its throws an exception. + xceiverClient + .watchForCommit(index + new Random().nextInt(100) + 10, 3000); Assert.fail("expected exception not thrown"); } catch (Exception e) { Assert.assertTrue( @@ -321,7 +316,7 @@ public class TestWatchForCommit { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 100, TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); startCluster(conf); XceiverClientManager clientManager = new XceiverClientManager(conf); ContainerWithPipeline container1 = storageContainerLocationClient @@ -343,67 +338,30 @@ public class TestWatchForCommit { cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); // again write data with more than max buffer limit. This wi try { - // just watch for a lo index which in not updated in the commitInfo Map - xceiverClient.watchForCommit(index + 1, 20000); + // just watch for a log index which in not updated in the commitInfo Map + // as well as there is no logIndex generate in Ratis. + // The basic idea here is just to test if its throws an exception. + xceiverClient + .watchForCommit(index + new Random().nextInt(100) + 10, 20000); Assert.fail("expected exception not thrown"); } catch (Exception e) { - Assert.assertTrue(HddsClientUtils - .checkForException(e) instanceof RaftRetryFailureException); + Assert.assertTrue(e instanceof ExecutionException); + // since the timeout value is quite long, the watch request will either + // fail with NotReplicated exceptio, RetryFailureException or + // RuntimeException + Assert.assertFalse(HddsClientUtils + .checkForException(e) instanceof TimeoutException); } clientManager.releaseClient(xceiverClient, false); shutdown(); } - @Test - public void test2WayCommitForRetryfailure() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 8); - startCluster(conf); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); - XceiverClientManager clientManager = new XceiverClientManager(conf); - - ContainerWithPipeline container1 = storageContainerLocationClient - .allocateContainer(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi xceiverClient = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, xceiverClient.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - xceiverClient.getPipeline()); - Pipeline pipeline = xceiverClient.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; - XceiverClientReply reply = xceiverClient.sendCommandAsync( - ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - xceiverClient.getPipeline())); - reply.getResponse().get(); - Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - reply = xceiverClient.sendCommandAsync(ContainerTestHelper - .getCloseContainer(pipeline, - container1.getContainerInfo().getContainerID())); - reply.getResponse().get(); - xceiverClient.watchForCommit(reply.getLogIndex(), 20000); - - // commitInfo Map will be reduced to 2 here - Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); - clientManager.releaseClient(xceiverClient, false); - Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); - Assert - .assertTrue(logCapturer.getOutput().contains("Committed by majority")); - logCapturer.stopCapturing(); - shutdown(); - } - @Test public void test2WayCommitForTimeoutException() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); startCluster(conf); GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); @@ -477,8 +435,12 @@ public class TestWatchForCommit { pipelineList.add(pipeline); ContainerTestHelper.waitForPipelineClose(pipelineList, cluster); try { - // just watch for a lo index which in not updated in the commitInfo Map - xceiverClient.watchForCommit(reply.getLogIndex() + 1, 20000); + // just watch for a log index which in not updated in the commitInfo Map + // as well as there is no logIndex generate in Ratis. + // The basic idea here is just to test if its throws an exception. + xceiverClient + .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10, + 20000); Assert.fail("Expected exception not thrown"); } catch(Exception e) { Assert.assertTrue(HddsClientUtils