diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java index 94a4b9486e..eb215d63a4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java @@ -102,4 +102,10 @@ public static ExcludeList getFromProtoBuf( }); return excludeList; } + + public void clear() { + datanodes.clear(); + containerIds.clear(); + pipelineIds.clear(); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 0d9529f8ea..c1f195f14c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -295,60 +295,66 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) throws IOException { int succeededAllocates = 0; while (len > 0) { - if (streamEntries.size() <= currentStreamIndex) { - Preconditions.checkNotNull(omClient); - // allocate a new block, if a exception happens, log an error and - // throw exception to the caller directly, and the write fails. - try { - allocateNewBlock(currentStreamIndex); - succeededAllocates += 1; - } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " - + "allocated " + succeededAllocates + " blocks for this write."); - throw ioe; - } - } - // in theory, this condition should never violate due the check above - // still do a sanity check. - Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); - BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); - - // length(len) will be in int range if the call is happening through - // write API of blockOutputStream. Length can be in long range if it comes - // via Exception path. - int writeLen = Math.min((int)len, (int) current.getRemaining()); - long currentPos = current.getWrittenDataLength(); try { - if (retry) { - current.writeOnRetry(len); - } else { - current.write(b, off, writeLen); - offset += writeLen; + if (streamEntries.size() <= currentStreamIndex) { + Preconditions.checkNotNull(omClient); + // allocate a new block, if a exception happens, log an error and + // throw exception to the caller directly, and the write fails. + try { + allocateNewBlock(currentStreamIndex); + succeededAllocates += 1; + } catch (IOException ioe) { + LOG.error("Try to allocate more blocks for write failed, already " + + "allocated " + succeededAllocates + + " blocks for this write."); + throw ioe; + } } - } catch (IOException ioe) { - // for the current iteration, totalDataWritten - currentPos gives the - // amount of data already written to the buffer + // in theory, this condition should never violate due the check above + // still do a sanity check. + Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); + BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); - // In the retryPath, the total data to be written will always be equal - // to or less than the max length of the buffer allocated. - // The len specified here is the combined sum of the data length of - // the buffers - Preconditions.checkState(!retry || len <= streamBufferMaxSize); - int dataWritten = (int) (current.getWrittenDataLength() - currentPos); - writeLen = retry ? (int) len : dataWritten; - // In retry path, the data written is already accounted in offset. - if (!retry) { - offset += writeLen; + // length(len) will be in int range if the call is happening through + // write API of blockOutputStream. Length can be in long range if it comes + // via Exception path. + int writeLen = Math.min((int) len, (int) current.getRemaining()); + long currentPos = current.getWrittenDataLength(); + try { + if (retry) { + current.writeOnRetry(len); + } else { + current.write(b, off, writeLen); + offset += writeLen; + } + } catch (IOException ioe) { + // for the current iteration, totalDataWritten - currentPos gives the + // amount of data already written to the buffer + + // In the retryPath, the total data to be written will always be equal + // to or less than the max length of the buffer allocated. + // The len specified here is the combined sum of the data length of + // the buffers + Preconditions.checkState(!retry || len <= streamBufferMaxSize); + int dataWritten = (int) (current.getWrittenDataLength() - currentPos); + writeLen = retry ? (int) len : dataWritten; + // In retry path, the data written is already accounted in offset. + if (!retry) { + offset += writeLen; + } + LOG.debug("writeLen {}, total len {}", writeLen, len); + handleException(current, currentStreamIndex, ioe); } - LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex, ioe); + if (current.getRemaining() <= 0) { + // since the current block is already written close the stream. + handleFlushOrClose(StreamAction.FULL); + } + len -= writeLen; + off += writeLen; + } catch (Exception e) { + markStreamClosed(); + throw e; } - if (current.getRemaining() <= 0) { - // since the current block is already written close the stream. - handleFlushOrClose(StreamAction.FULL); - } - len -= writeLen; - off += writeLen; } } @@ -365,7 +371,7 @@ private void discardPreallocatedBlocks(long containerID, // pre allocated blocks available. // This will be called only to discard the next subsequent unused blocks - // in the sreamEntryList. + // in the streamEntryList. if (streamIndex < streamEntries.size()) { ListIterator streamEntryIterator = streamEntries.listIterator(streamIndex); @@ -398,6 +404,20 @@ private void removeEmptyBlocks() { } } } + + private void cleanup() { + if (excludeList != null) { + excludeList.clear(); + excludeList = null; + } + if (bufferPool != null) { + bufferPool.clearBufferPool(); + } + + if (streamEntries != null) { + streamEntries.clear(); + } + } /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in @@ -418,8 +438,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, closedContainerException = checkIfContainerIsClosed(t); } PipelineID pipelineId = null; - long totalSuccessfulFlushedData = - streamEntry.getTotalAckDataLength(); + long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = computeBufferData(); @@ -450,8 +469,8 @@ private void handleException(BlockOutputStreamEntry streamEntry, if (closedContainerException) { // discard subsequent pre allocated blocks from the streamEntries list // from the closed container - discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), - null, streamIndex + 1); + discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null, + streamIndex + 1); } else { // In case there is timeoutException or Watch for commit happening over // majority or the client connection failure to the leader in the @@ -475,6 +494,11 @@ private void handleException(BlockOutputStreamEntry streamEntry, } } + private void markStreamClosed() { + cleanup(); + closed = true; + } + private void handleRetry(IOException exception, long len) throws IOException { RetryPolicy.RetryAction action; try { @@ -586,40 +610,46 @@ private void handleFlushOrClose(StreamAction op) throws IOException { return; } while (true) { - int size = streamEntries.size(); - int streamIndex = - currentStreamIndex >= size ? size - 1 : currentStreamIndex; - BlockOutputStreamEntry entry = streamEntries.get(streamIndex); - if (entry != null) { - try { - Collection failedServers = entry.getFailedServers(); - // failed servers can be null in case there is no data written in the - // stream - if (failedServers != null && !failedServers.isEmpty()) { - excludeList.addDatanodes(failedServers); - } - switch (op) { - case CLOSE: - entry.close(); - break; - case FULL: - if (entry.getRemaining() == 0) { - entry.close(); - currentStreamIndex++; + try { + int size = streamEntries.size(); + int streamIndex = + currentStreamIndex >= size ? size - 1 : currentStreamIndex; + BlockOutputStreamEntry entry = streamEntries.get(streamIndex); + if (entry != null) { + try { + Collection failedServers = + entry.getFailedServers(); + // failed servers can be null in case there is no data written in the + // stream + if (failedServers != null && !failedServers.isEmpty()) { + excludeList.addDatanodes(failedServers); } - break; - case FLUSH: - entry.flush(); - break; - default: - throw new IOException("Invalid Operation"); + switch (op) { + case CLOSE: + entry.close(); + break; + case FULL: + if (entry.getRemaining() == 0) { + entry.close(); + currentStreamIndex++; + } + break; + case FLUSH: + entry.flush(); + break; + default: + throw new IOException("Invalid Operation"); + } + } catch (IOException ioe) { + handleException(entry, streamIndex, ioe); + continue; } - } catch (IOException ioe) { - handleException(entry, streamIndex, ioe); - continue; } + break; + } catch (Exception e) { + markStreamClosed(); + throw e; } - break; } } @@ -658,7 +688,7 @@ public void close() throws IOException { } catch (IOException ioe) { throw ioe; } finally { - bufferPool.clearBufferPool(); + cleanup(); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 32bef12ae7..399b977d33 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -189,6 +189,7 @@ public void testBufferCaching() throws Exception { // flush ensures watchForCommit updates the total length acknowledged Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); @@ -208,7 +209,7 @@ public void testBufferCaching() throws Exception { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -263,6 +264,7 @@ public void testFlushChunk() throws Exception { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -302,7 +304,7 @@ public void testFlushChunk() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -397,6 +399,7 @@ public void testMultiChunkWrite() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -454,6 +457,7 @@ public void testMultiChunkWrite2() throws Exception { blockOutputStream.getCommitIndex2flushedDataMap().size()); Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); key.close(); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -471,7 +475,7 @@ public void testMultiChunkWrite2() throws Exception { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -536,6 +540,7 @@ public void testFullBufferCondition() throws Exception { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -570,7 +575,7 @@ public void testFullBufferCondition() throws Exception { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -638,6 +643,7 @@ public void testWriteWithExceedingMaxBufferLimit() throws Exception { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -673,7 +679,7 @@ public void testWriteWithExceedingMaxBufferLimit() throws Exception { metrics.getTotalOpCount()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index f228dad69e..89a2af966a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -234,6 +234,7 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { // and one flush for partial chunk key.flush(); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); @@ -249,7 +250,7 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -372,6 +373,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception { key.flush(); Assert.assertEquals(2, raftClient.getCommitInfoMap().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); Assert @@ -382,7 +384,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -515,13 +517,14 @@ public void test2DatanodesFailure() throws Exception { // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // now close the stream, It will update the ack length after watchForCommit + + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); key.close(); Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -538,7 +541,7 @@ public void test2DatanodesFailure() throws Exception { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -637,6 +640,7 @@ public void testFailureWithPrimeSizedData() throws Exception { // and one flush for partial chunk key.flush(); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled @@ -652,7 +656,6 @@ public void testFailureWithPrimeSizedData() throws Exception { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -663,7 +666,7 @@ public void testFailureWithPrimeSizedData() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); @@ -774,7 +777,6 @@ public void testExceptionDuringClose() throws Exception { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -785,7 +787,7 @@ public void testExceptionDuringClose() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); @@ -911,6 +913,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // commitInfoMap will remain intact as there is no server failure Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); // make sure the bufferPool is empty @@ -919,7 +922,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1046,6 +1049,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); Assert @@ -1054,7 +1058,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1071,6 +1075,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); @@ -1198,7 +1203,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 381cf14e4c..5cb6dbc047 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -19,10 +19,12 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -66,6 +68,7 @@ public class TestOzoneClientRetriesOnException { private String volumeName; private String bucketName; private String keyString; + private XceiverClientManager xceiverClientManager; /** * Create a MiniDFSCluster for testing. @@ -84,8 +87,6 @@ public void init() throws Exception { conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2); - conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s"); conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) @@ -100,6 +101,7 @@ public void init() throws Exception { //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); objectStore = client.getObjectStore(); + xceiverClientManager = new XceiverClientManager(conf); keyString = UUID.randomUUID().toString(); volumeName = "testblockoutputstreamwithretries"; bucketName = volumeName; @@ -152,8 +154,9 @@ public void testGroupMismatchExceptionHandling() throws Exception { .getIoException()) instanceof GroupMismatchException); Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() .contains(pipeline.getId())); - key.close(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + key.close(); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); validateData(keyName, data1); } @@ -171,13 +174,8 @@ public void testMaxRetriesByOzoneClient() throws Exception { byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); - key.write(data1); - - OutputStream stream = entries.get(0).getOutputStream(); - Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - List pipelineList = new ArrayList<>(); long containerID; + List containerList = new ArrayList<>(); for (BlockOutputStreamEntry entry : entries) { containerID = entry.getBlockID().getContainerID(); ContainerInfo container = @@ -186,18 +184,40 @@ public void testMaxRetriesByOzoneClient() throws Exception { Pipeline pipeline = cluster.getStorageContainerManager().getPipelineManager() .getPipeline(container.getPipelineID()); - pipelineList.add(pipeline.getId()); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + if (!containerList.contains(containerID)) { + xceiverClient.sendCommand(ContainerTestHelper + .getCreateContainerRequest(containerID, pipeline)); + } + xceiverClientManager.releaseClient(xceiverClient, false); } - ContainerTestHelper.waitForPipelineClose(key, cluster, false); + key.write(data1); + OutputStream stream = entries.get(0).getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + ContainerTestHelper.waitForContainerClose(key, cluster); try { key.write(data1); + Assert.fail("Expected exception not thrown"); } catch (IOException ioe) { Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream - .getIoException()) instanceof GroupMismatchException); + .getIoException()) instanceof ContainerNotOpenException); Assert.assertTrue(ioe.getMessage().contains( "Retry request failed. retries get failed due to exceeded maximum " + "allowed retries number: 3")); } + try { + key.flush(); + Assert.fail("Expected exception not thrown"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains("Stream is closed")); + } + try { + key.close(); + } catch (IOException ioe) { + Assert.fail("Expected should not be thrown"); + } } private OzoneOutputStream createKey(String keyName, ReplicationType type, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 93807b4c2c..a1fd17ccd6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -727,7 +727,9 @@ public static void waitForContainerClose(OzoneOutputStream outputStream, keyOutputStream.getLocationInfoList(); List containerIdList = new ArrayList<>(); for (OmKeyLocationInfo info : locationInfoList) { - containerIdList.add(info.getContainerID()); + long id = info.getContainerID(); + if (!containerIdList.contains(id)) + containerIdList.add(id); } Assert.assertTrue(!containerIdList.isEmpty()); waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));