diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index cfbb6ae9da..a8ead77b5d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -55,6 +55,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls @@ -100,7 +101,7 @@ public class BlockOutputStream extends OutputStream { // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next // request will fail upfront. - private IOException ioException; + private AtomicReference ioException; private ExecutorService responseExecutor; // the effective length of data flushed so far @@ -187,6 +188,7 @@ public BlockOutputStream(BlockID blockID, String key, writtenDataLength = 0; failedServers = Collections.emptyList(); bufferList = null; + ioException = new AtomicReference<>(null); } @@ -221,9 +223,8 @@ public BufferPool getBufferPool() { return bufferPool; } - @VisibleForTesting public IOException getIoException() { - return ioException; + return ioException.get(); } @VisibleForTesting @@ -372,10 +373,9 @@ private void handleFullBuffer() throws IOException { waitOnFlushFutures(); } } catch (InterruptedException | ExecutionException e) { - ioException = new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); + setIoException(e); adjustBuffersOnException(); - throw ioException; + throw getIoException(); } if (!commitIndex2flushedDataMap.isEmpty()) { watchForCommit( @@ -430,9 +430,9 @@ private void watchForCommit(long commitIndex) throws IOException { adjustBuffers(index); } catch (TimeoutException | InterruptedException | ExecutionException e) { LOG.warn("watchForCommit failed for index " + commitIndex, e); + setIoException(e); adjustBuffersOnException(); - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); + throw getIoException(); } } @@ -461,7 +461,7 @@ ContainerCommandResponseProto> executePutBlock() throw new CompletionException(sce); } // if the ioException is not set, putBlock is successful - if (ioException == null) { + if (getIoException() == null) { BlockID responseBlockID = BlockID.getFromProtobuf( e.getPutBlock().getCommittedBlockLength().getBlockID()); Preconditions.checkState(blockID.getContainerBlockID() @@ -505,10 +505,9 @@ public void flush() throws IOException { } catch (InterruptedException | ExecutionException e) { // just set the exception here as well in order to maintain sanctity of // ioException field - ioException = new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); + setIoException(e); adjustBuffersOnException(); - throw ioException; + throw getIoException(); } } } @@ -580,10 +579,9 @@ public void close() throws IOException { try { handleFlush(); } catch (InterruptedException | ExecutionException e) { - ioException = new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); + setIoException(e); adjustBuffersOnException(); - throw ioException; + throw getIoException(); } finally { cleanup(false); } @@ -611,8 +609,9 @@ private void validateResponse( // if the ioException is already set, it means a prev request has failed // just throw the exception. The current operation will fail with the // original error - if (ioException != null) { - throw ioException; + IOException exception = getIoException(); + if (exception != null) { + throw exception; } ContainerProtocolCalls.validateContainerResponse(responseProto); } catch (StorageContainerException sce) { @@ -622,10 +621,12 @@ private void validateResponse( } } + private void setIoException(Exception e) { - if (ioException != null) { - ioException = new IOException( + if (getIoException() == null) { + IOException exception = new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); + ioException.compareAndSet(null, exception); } } @@ -659,9 +660,9 @@ public void cleanup(boolean invalidateClient) { private void checkOpen() throws IOException { if (xceiverClient == null) { throw new IOException("BlockOutputStream has been closed."); - } else if (ioException != null) { + } else if (getIoException() != null) { adjustBuffersOnException(); - throw ioException; + throw getIoException(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 8f09ff2527..a542191339 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -572,6 +572,17 @@ public List getPipelineReport() { } } + @VisibleForTesting + public List getPipelineIds() { + Iterable gids = server.getGroupIds(); + List pipelineIDs = new ArrayList<>(); + for (RaftGroupId groupId : gids) { + pipelineIDs.add(PipelineID.valueOf(groupId.getUuid())); + LOG.info("pipeline id {}", PipelineID.valueOf(groupId.getUuid())); + } + return pipelineIDs; + } + void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) { handlePipelineFailure(group.getGroupId(), roleInfoProto); } diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index 32b2c03ad9..1110fb8d1d 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -46,7 +46,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> 0.5.0-SNAPSHOT - 0.4.0-1fc5ace-SNAPSHOT + 0.4.0-8fed368-SNAPSHOT 1.60 diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 012a2256b2..9e9bb39796 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.rest.response.*; import org.apache.ratis.protocol.AlreadyClosedException; +import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; import java.util.ArrayList; @@ -43,6 +44,7 @@ private OzoneClientUtils() {} add(ContainerNotOpenException.class); add(RaftRetryFailureException.class); add(AlreadyClosedException.class); + add(GroupMismatchException.class); }}; /** * Returns a BucketInfo object constructed using fields of the input diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index c04105c384..0d9529f8ea 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.protocol.AlreadyClosedException; +import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -423,8 +424,8 @@ private void handleException(BlockOutputStreamEntry streamEntry, streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = computeBufferData(); LOG.warn("Encountered exception {}. The last committed block length is {}, " - + "uncommitted data length is {}", exception, - totalSuccessfulFlushedData, bufferedDataLen); + + "uncommitted data length is {} retry count {}", exception, + totalSuccessfulFlushedData, bufferedDataLen, retryCount); Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); @@ -435,7 +436,8 @@ private void handleException(BlockOutputStreamEntry streamEntry, } if (closedContainerException) { excludeList.addConatinerId(ContainerID.valueof(containerId)); - } else if (retryFailure || t instanceof TimeoutException) { + } else if (retryFailure || t instanceof TimeoutException + || t instanceof GroupMismatchException) { pipelineId = streamEntry.getPipeline().getId(); excludeList.addPipeline(pipelineId); } @@ -482,11 +484,12 @@ private void handleRetry(IOException exception, long len) throws IOException { throw e instanceof IOException ? (IOException) e : new IOException(e); } if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + String msg = ""; if (action.reason != null) { - LOG.error("Retry request failed. " + action.reason, - exception); + msg = "Retry request failed. " + action.reason; + LOG.error(msg, exception); } - throw exception; + throw new IOException(msg, exception); } // Throw the exception if the thread is interrupted diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index e94f0ac3e5..8c40aa30ec 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -181,6 +181,8 @@ void restartStorageContainerManager() throws InterruptedException, void restartHddsDatanode(int i, boolean waitForDatanode) throws InterruptedException, TimeoutException; + int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException; + /** * Restart a particular HddsDatanode. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 5cd08419c0..8018bab483 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -192,7 +192,8 @@ public List getHddsDatanodes() { return hddsDatanodes; } - private int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException { + @Override + public int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException { for (HddsDatanodeService service : hddsDatanodes) { if (service.getDatanodeDetails().equals(dn)) { return hddsDatanodes.indexOf(service); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 54cdff0228..f228dad69e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.container.common.helpers + .ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -35,7 +36,7 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.ratis.protocol.AlreadyClosedException; +import org.apache.ratis.protocol.RaftRetryFailureException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -510,7 +511,7 @@ public void test2DatanodesFailure() throws Exception { // and one flush for partial chunk key.flush(); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream - .getIoException()) instanceof AlreadyClosedException); + .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // now close the stream, It will update the ack length after watchForCommit @@ -1041,7 +1042,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { key.flush(); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream - .getIoException()) instanceof AlreadyClosedException); + .getIoException()) instanceof RaftRetryFailureException); Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -1183,7 +1184,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { key.flush(); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream - .getIoException()) instanceof AlreadyClosedException); + .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java index 2c3cfab045..13e3eff825 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java @@ -139,7 +139,13 @@ public void testContainerStateMachineFailures() throws Exception { .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainerPath())); - key.close(); + try { + key.close(); + Assert.fail(); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains( + "Requested operation not allowed as ContainerState is UNHEALTHY")); + } // Make sure the container is marked unhealthy Assert.assertTrue( cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 68f1eccb2a..a8b72954b6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -142,7 +142,13 @@ public void testContainerStateMachineFailures() throws Exception { .getContainer().getContainerSet() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainerPath())); - key.close(); + try { + key.close(); + Assert.fail(); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains( + "Requested operation not allowed as ContainerState is UNHEALTHY")); + } long containerID = omKeyLocationInfo.getContainerID(); // Make sure the container is marked unhealthy diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java new file mode 100644 index 0000000000..381cf14e4c --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.ratis.protocol.GroupMismatchException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests failure detection and handling in BlockOutputStream Class. + */ +public class TestOzoneClientRetriesOnException { + + private static MiniOzoneCluster cluster; + private OzoneConfiguration conf = new OzoneConfiguration(); + private OzoneClient client; + private ObjectStore objectStore; + private int chunkSize; + private int flushSize; + private int maxFlushSize; + private int blockSize; + private String volumeName; + private String bucketName; + private String keyString; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @Before + public void init() throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2); + conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s"); + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); + conf.setQuietMode(false); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "testblockoutputstreamwithretries"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + private String getKeyName() { + return UUID.randomUUID().toString(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testGroupMismatchExceptionHandling() throws Exception { + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + long containerID = + keyOutputStream.getStreamEntries().get(0).getBlockID().getContainerID(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + ContainerTestHelper.waitForPipelineClose(key, cluster, true); + key.flush(); + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof GroupMismatchException); + Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() + .contains(pipeline.getId())); + key.close(); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + validateData(keyName, data1); + } + + @Test + public void testMaxRetriesByOzoneClient() throws Exception { + String keyName = getKeyName(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, 4 * blockSize); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + List entries = keyOutputStream.getStreamEntries(); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 4); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + + OutputStream stream = entries.get(0).getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + List pipelineList = new ArrayList<>(); + long containerID; + for (BlockOutputStreamEntry entry : entries) { + containerID = entry.getBlockID().getContainerID(); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + pipelineList.add(pipeline.getId()); + } + ContainerTestHelper.waitForPipelineClose(key, cluster, false); + try { + key.write(data1); + } catch (IOException ioe) { + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof GroupMismatchException); + Assert.assertTrue(ioe.getMessage().contains( + "Retry request failed. retries get failed due to exceeded maximum " + + "allowed retries number: 3")); + } + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + + private void validateData(String keyName, byte[] data) throws Exception { + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 0b618a0d70..dc5e8b43bb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -66,6 +66,8 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.Token; @@ -740,8 +742,76 @@ public static void waitForContainerClose(OzoneOutputStream outputStream, containerIdList.add(info.getContainerID()); } Assert.assertTrue(!containerIdList.isEmpty()); - ContainerTestHelper - .waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); + waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); + } + + public static void waitForPipelineClose(OzoneOutputStream outputStream, + MiniOzoneCluster cluster, boolean waitForContainerCreation) + throws Exception { + KeyOutputStream keyOutputStream = + (KeyOutputStream) outputStream.getOutputStream(); + List locationInfoList = + keyOutputStream.getLocationInfoList(); + List containerIdList = new ArrayList<>(); + for (OmKeyLocationInfo info : locationInfoList) { + containerIdList.add(info.getContainerID()); + } + Assert.assertTrue(!containerIdList.isEmpty()); + waitForPipelineClose(cluster, waitForContainerCreation, + containerIdList.toArray(new Long[0])); + } + + public static void waitForPipelineClose(MiniOzoneCluster cluster, + boolean waitForContainerCreation, Long... containerIdList) + throws TimeoutException, InterruptedException, IOException { + List pipelineList = new ArrayList<>(); + for (long containerID : containerIdList) { + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + if (!pipelineList.contains(pipeline)) { + pipelineList.add(pipeline); + } + List datanodes = pipeline.getNodes(); + + if (waitForContainerCreation) { + for (DatanodeDetails details : datanodes) { + // Client will issue write chunk and it will create the container on + // datanodes. + // wait for the container to be created + GenericTestUtils + .waitFor(() -> isContainerPresent(cluster, containerID, details), + 500, 100 * 1000); + Assert.assertTrue(isContainerPresent(cluster, containerID, details)); + + // make sure the container gets created first + Assert.assertFalse(ContainerTestHelper + .isContainerClosed(cluster, containerID, details)); + } + } + } + for (Pipeline pipeline1 : pipelineList) { + // issue pipeline destroy command + cluster.getStorageContainerManager().getPipelineManager() + .finalizeAndDestroyPipeline(pipeline1, false); + } + + // wait for the pipeline to get destroyed in the datanodes + for (Pipeline pipeline : pipelineList) { + for (DatanodeDetails dn : pipeline.getNodes()) { + XceiverServerSpi server = + cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn)) + .getDatanodeStateMachine().getContainer().getWriteChannel(); + Assert.assertTrue(server instanceof XceiverServerRatis); + XceiverServerRatis raftServer = (XceiverServerRatis) server; + GenericTestUtils.waitFor( + () -> (!raftServer.getPipelineIds().contains(pipeline.getId())), + 500, 100 * 1000); + } + } } public static void waitForContainerClose(MiniOzoneCluster cluster, @@ -785,13 +855,13 @@ public static void waitForContainerClose(MiniOzoneCluster cluster, // but not yet been used by the client. In such a case container is never // created. for (DatanodeDetails datanodeDetails : datanodes) { - GenericTestUtils.waitFor(() -> ContainerTestHelper - .isContainerClosed(cluster, containerID, datanodeDetails), 500, + GenericTestUtils.waitFor( + () -> isContainerClosed(cluster, containerID, datanodeDetails), 500, 15 * 1000); //double check if it's really closed // (waitFor also throws an exception) - Assert.assertTrue(ContainerTestHelper - .isContainerClosed(cluster, containerID, datanodeDetails)); + Assert.assertTrue( + isContainerClosed(cluster, containerID, datanodeDetails)); } index++; } diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index b243ccc205..11498944aa 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -29,7 +29,7 @@ 3.2.0 0.5.0-SNAPSHOT 0.5.0-SNAPSHOT - 0.4.0-1fc5ace-SNAPSHOT + 0.4.0-8fed368-SNAPSHOT 1.60 Crater Lake ${ozone.version}