HDDS-1373. KeyOutputStream, close after write request fails after retries, runs into IllegalArgumentException. Contributed by Shashikant Banerjee

This commit is contained in:
Shashikant Banerjee 2019-04-11 22:13:41 +05:30
parent 2d4f6b6daa
commit 22d0468b39
7 changed files with 696 additions and 115 deletions

View File

@ -102,4 +102,10 @@ public static ExcludeList getFromProtoBuf(
}); });
return excludeList; return excludeList;
} }
public void clear() {
datanodes.clear();
containerIds.clear();
pipelineIds.clear();
}
} }

View File

@ -295,60 +295,66 @@ private void handleWrite(byte[] b, int off, long len, boolean retry)
throws IOException { throws IOException {
int succeededAllocates = 0; int succeededAllocates = 0;
while (len > 0) { while (len > 0) {
if (streamEntries.size() <= currentStreamIndex) {
Preconditions.checkNotNull(omClient);
// allocate a new block, if a exception happens, log an error and
// throw exception to the caller directly, and the write fails.
try {
allocateNewBlock(currentStreamIndex);
succeededAllocates += 1;
} catch (IOException ioe) {
LOG.error("Try to allocate more blocks for write failed, already "
+ "allocated " + succeededAllocates + " blocks for this write.");
throw ioe;
}
}
// in theory, this condition should never violate due the check above
// still do a sanity check.
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
// length(len) will be in int range if the call is happening through
// write API of blockOutputStream. Length can be in long range if it comes
// via Exception path.
int writeLen = Math.min((int)len, (int) current.getRemaining());
long currentPos = current.getWrittenDataLength();
try { try {
if (retry) { if (streamEntries.size() <= currentStreamIndex) {
current.writeOnRetry(len); Preconditions.checkNotNull(omClient);
} else { // allocate a new block, if a exception happens, log an error and
current.write(b, off, writeLen); // throw exception to the caller directly, and the write fails.
offset += writeLen; try {
allocateNewBlock(currentStreamIndex);
succeededAllocates += 1;
} catch (IOException ioe) {
LOG.error("Try to allocate more blocks for write failed, already "
+ "allocated " + succeededAllocates
+ " blocks for this write.");
throw ioe;
}
} }
} catch (IOException ioe) { // in theory, this condition should never violate due the check above
// for the current iteration, totalDataWritten - currentPos gives the // still do a sanity check.
// amount of data already written to the buffer Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
// In the retryPath, the total data to be written will always be equal // length(len) will be in int range if the call is happening through
// to or less than the max length of the buffer allocated. // write API of blockOutputStream. Length can be in long range if it comes
// The len specified here is the combined sum of the data length of // via Exception path.
// the buffers int writeLen = Math.min((int) len, (int) current.getRemaining());
Preconditions.checkState(!retry || len <= streamBufferMaxSize); long currentPos = current.getWrittenDataLength();
int dataWritten = (int) (current.getWrittenDataLength() - currentPos); try {
writeLen = retry ? (int) len : dataWritten; if (retry) {
// In retry path, the data written is already accounted in offset. current.writeOnRetry(len);
if (!retry) { } else {
offset += writeLen; current.write(b, off, writeLen);
offset += writeLen;
}
} catch (IOException ioe) {
// for the current iteration, totalDataWritten - currentPos gives the
// amount of data already written to the buffer
// In the retryPath, the total data to be written will always be equal
// to or less than the max length of the buffer allocated.
// The len specified here is the combined sum of the data length of
// the buffers
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
writeLen = retry ? (int) len : dataWritten;
// In retry path, the data written is already accounted in offset.
if (!retry) {
offset += writeLen;
}
LOG.debug("writeLen {}, total len {}", writeLen, len);
handleException(current, currentStreamIndex, ioe);
} }
LOG.debug("writeLen {}, total len {}", writeLen, len); if (current.getRemaining() <= 0) {
handleException(current, currentStreamIndex, ioe); // since the current block is already written close the stream.
handleFlushOrClose(StreamAction.FULL);
}
len -= writeLen;
off += writeLen;
} catch (Exception e) {
markStreamClosed();
throw e;
} }
if (current.getRemaining() <= 0) {
// since the current block is already written close the stream.
handleFlushOrClose(StreamAction.FULL);
}
len -= writeLen;
off += writeLen;
} }
} }
@ -365,7 +371,7 @@ private void discardPreallocatedBlocks(long containerID,
// pre allocated blocks available. // pre allocated blocks available.
// This will be called only to discard the next subsequent unused blocks // This will be called only to discard the next subsequent unused blocks
// in the sreamEntryList. // in the streamEntryList.
if (streamIndex < streamEntries.size()) { if (streamIndex < streamEntries.size()) {
ListIterator<BlockOutputStreamEntry> streamEntryIterator = ListIterator<BlockOutputStreamEntry> streamEntryIterator =
streamEntries.listIterator(streamIndex); streamEntries.listIterator(streamIndex);
@ -398,6 +404,20 @@ private void removeEmptyBlocks() {
} }
} }
} }
private void cleanup() {
if (excludeList != null) {
excludeList.clear();
excludeList = null;
}
if (bufferPool != null) {
bufferPool.clearBufferPool();
}
if (streamEntries != null) {
streamEntries.clear();
}
}
/** /**
* It performs following actions : * It performs following actions :
* a. Updates the committed length at datanode for the current stream in * a. Updates the committed length at datanode for the current stream in
@ -418,8 +438,7 @@ private void handleException(BlockOutputStreamEntry streamEntry,
closedContainerException = checkIfContainerIsClosed(t); closedContainerException = checkIfContainerIsClosed(t);
} }
PipelineID pipelineId = null; PipelineID pipelineId = null;
long totalSuccessfulFlushedData = long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
streamEntry.getTotalAckDataLength();
//set the correct length for the current stream //set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData); streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = computeBufferData(); long bufferedDataLen = computeBufferData();
@ -450,8 +469,8 @@ private void handleException(BlockOutputStreamEntry streamEntry,
if (closedContainerException) { if (closedContainerException) {
// discard subsequent pre allocated blocks from the streamEntries list // discard subsequent pre allocated blocks from the streamEntries list
// from the closed container // from the closed container
discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null,
null, streamIndex + 1); streamIndex + 1);
} else { } else {
// In case there is timeoutException or Watch for commit happening over // In case there is timeoutException or Watch for commit happening over
// majority or the client connection failure to the leader in the // majority or the client connection failure to the leader in the
@ -475,6 +494,11 @@ private void handleException(BlockOutputStreamEntry streamEntry,
} }
} }
private void markStreamClosed() {
cleanup();
closed = true;
}
private void handleRetry(IOException exception, long len) throws IOException { private void handleRetry(IOException exception, long len) throws IOException {
RetryPolicy.RetryAction action; RetryPolicy.RetryAction action;
try { try {
@ -586,40 +610,46 @@ private void handleFlushOrClose(StreamAction op) throws IOException {
return; return;
} }
while (true) { while (true) {
int size = streamEntries.size(); try {
int streamIndex = int size = streamEntries.size();
currentStreamIndex >= size ? size - 1 : currentStreamIndex; int streamIndex =
BlockOutputStreamEntry entry = streamEntries.get(streamIndex); currentStreamIndex >= size ? size - 1 : currentStreamIndex;
if (entry != null) { BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
try { if (entry != null) {
Collection<DatanodeDetails> failedServers = entry.getFailedServers(); try {
// failed servers can be null in case there is no data written in the Collection<DatanodeDetails> failedServers =
// stream entry.getFailedServers();
if (failedServers != null && !failedServers.isEmpty()) { // failed servers can be null in case there is no data written in the
excludeList.addDatanodes(failedServers); // stream
} if (failedServers != null && !failedServers.isEmpty()) {
switch (op) { excludeList.addDatanodes(failedServers);
case CLOSE:
entry.close();
break;
case FULL:
if (entry.getRemaining() == 0) {
entry.close();
currentStreamIndex++;
} }
break; switch (op) {
case FLUSH: case CLOSE:
entry.flush(); entry.close();
break; break;
default: case FULL:
throw new IOException("Invalid Operation"); if (entry.getRemaining() == 0) {
entry.close();
currentStreamIndex++;
}
break;
case FLUSH:
entry.flush();
break;
default:
throw new IOException("Invalid Operation");
}
} catch (IOException ioe) {
handleException(entry, streamIndex, ioe);
continue;
} }
} catch (IOException ioe) {
handleException(entry, streamIndex, ioe);
continue;
} }
break;
} catch (Exception e) {
markStreamClosed();
throw e;
} }
break;
} }
} }
@ -658,7 +688,7 @@ public void close() throws IOException {
} catch (IOException ioe) { } catch (IOException ioe) {
throw ioe; throw ioe;
} finally { } finally {
bufferPool.clearBufferPool(); cleanup();
} }
} }

View File

@ -189,6 +189,7 @@ public void testBufferCaching() throws Exception {
// flush ensures watchForCommit updates the total length acknowledged // flush ensures watchForCommit updates the total length acknowledged
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
key.close(); key.close();
@ -208,7 +209,7 @@ public void testBufferCaching() throws Exception {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }
@ -263,6 +264,7 @@ public void testFlushChunk() throws Exception {
// Now do a flush. This will flush the data and update the flush length and // Now do a flush. This will flush the data and update the flush length and
// the map. // the map.
key.flush(); key.flush();
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
// flush is a sync call, all pending operations will complete // flush is a sync call, all pending operations will complete
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@ -302,7 +304,7 @@ public void testFlushChunk() throws Exception {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3, Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }
@ -397,6 +399,7 @@ public void testMultiChunkWrite() throws Exception {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3, Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }
@ -454,6 +457,7 @@ public void testMultiChunkWrite2() throws Exception {
blockOutputStream.getCommitIndex2flushedDataMap().size()); blockOutputStream.getCommitIndex2flushedDataMap().size());
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
key.close(); key.close();
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@ -471,7 +475,7 @@ public void testMultiChunkWrite2() throws Exception {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }
@ -536,6 +540,7 @@ public void testFullBufferCondition() throws Exception {
// Now do a flush. This will flush the data and update the flush length and // Now do a flush. This will flush the data and update the flush length and
// the map. // the map.
key.flush(); key.flush();
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -570,7 +575,7 @@ public void testFullBufferCondition() throws Exception {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }
@ -638,6 +643,7 @@ public void testWriteWithExceedingMaxBufferLimit() throws Exception {
// Now do a flush. This will flush the data and update the flush length and // Now do a flush. This will flush the data and update the flush length and
// the map. // the map.
key.flush(); key.flush();
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -673,7 +679,7 @@ public void testWriteWithExceedingMaxBufferLimit() throws Exception {
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }

View File

@ -234,6 +234,7 @@ public void testWatchForCommitWithCloseContainerException() throws Exception {
// and one flush for partial chunk // and one flush for partial chunk
key.flush(); key.flush();
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException); .getIoException()) instanceof ContainerNotOpenException);
@ -249,7 +250,7 @@ public void testWatchForCommitWithCloseContainerException() throws Exception {
Assert Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -372,6 +373,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception {
key.flush(); key.flush();
Assert.assertEquals(2, raftClient.getCommitInfoMap().size()); Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
key.close(); key.close();
Assert Assert
@ -382,7 +384,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception {
Assert Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -515,13 +517,14 @@ public void test2DatanodesFailure() throws Exception {
// Make sure the retryCount is reset after the exception is handled // Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0); Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
key.close(); key.close();
Assert Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -538,7 +541,7 @@ public void test2DatanodesFailure() throws Exception {
Assert Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }
@ -637,6 +640,7 @@ public void testFailureWithPrimeSizedData() throws Exception {
// and one flush for partial chunk // and one flush for partial chunk
key.flush(); key.flush();
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException); .getIoException()) instanceof ContainerNotOpenException);
// Make sure the retryCount is reset after the exception is handled // Make sure the retryCount is reset after the exception is handled
@ -652,7 +656,6 @@ public void testFailureWithPrimeSizedData() throws Exception {
Assert Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -663,7 +666,7 @@ public void testFailureWithPrimeSizedData() throws Exception {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 9, Assert.assertEquals(totalOpCount + 9,
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
// Written the same data twice // Written the same data twice
String dataString = new String(data1, UTF_8); String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes()); validateData(keyName, dataString.concat(dataString).getBytes());
@ -774,7 +777,6 @@ public void testExceptionDuringClose() throws Exception {
Assert Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -785,7 +787,7 @@ public void testExceptionDuringClose() throws Exception {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 9, Assert.assertEquals(totalOpCount + 9,
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
// Written the same data twice // Written the same data twice
String dataString = new String(data1, UTF_8); String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes()); validateData(keyName, dataString.concat(dataString).getBytes());
@ -911,6 +913,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception {
Assert.assertTrue(keyOutputStream.getRetryCount() == 0); Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// commitInfoMap will remain intact as there is no server failure // commitInfoMap will remain intact as there is no server failure
Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
key.close(); key.close();
// make sure the bufferPool is empty // make sure the bufferPool is empty
@ -919,7 +922,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception {
Assert Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -1046,6 +1049,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
// Make sure the retryCount is reset after the exception is handled // Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0); Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
key.close(); key.close();
Assert Assert
@ -1054,7 +1058,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
Assert Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
@ -1071,6 +1075,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22, Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
// Written the same data twice // Written the same data twice
String dataString = new String(data1, UTF_8); String dataString = new String(data1, UTF_8);
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
@ -1198,7 +1203,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception {
Assert Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,

View File

@ -19,10 +19,12 @@
import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -66,6 +68,7 @@ public class TestOzoneClientRetriesOnException {
private String volumeName; private String volumeName;
private String bucketName; private String bucketName;
private String keyString; private String keyString;
private XceiverClientManager xceiverClientManager;
/** /**
* Create a MiniDFSCluster for testing. * Create a MiniDFSCluster for testing.
@ -84,8 +87,6 @@ public void init() throws Exception {
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s");
conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
conf.setQuietMode(false); conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf) cluster = MiniOzoneCluster.newBuilder(conf)
@ -100,6 +101,7 @@ public void init() throws Exception {
//the easiest way to create an open container is creating a key //the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf); client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore(); objectStore = client.getObjectStore();
xceiverClientManager = new XceiverClientManager(conf);
keyString = UUID.randomUUID().toString(); keyString = UUID.randomUUID().toString();
volumeName = "testblockoutputstreamwithretries"; volumeName = "testblockoutputstreamwithretries";
bucketName = volumeName; bucketName = volumeName;
@ -152,8 +154,9 @@ public void testGroupMismatchExceptionHandling() throws Exception {
.getIoException()) instanceof GroupMismatchException); .getIoException()) instanceof GroupMismatchException);
Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
.contains(pipeline.getId())); .contains(pipeline.getId()));
key.close();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
key.close();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
validateData(keyName, data1); validateData(keyName, data1);
} }
@ -171,13 +174,8 @@ public void testMaxRetriesByOzoneClient() throws Exception {
byte[] data1 = byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength) ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8); .getBytes(UTF_8);
key.write(data1);
OutputStream stream = entries.get(0).getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
List<PipelineID> pipelineList = new ArrayList<>();
long containerID; long containerID;
List<Long> containerList = new ArrayList<>();
for (BlockOutputStreamEntry entry : entries) { for (BlockOutputStreamEntry entry : entries) {
containerID = entry.getBlockID().getContainerID(); containerID = entry.getBlockID().getContainerID();
ContainerInfo container = ContainerInfo container =
@ -186,18 +184,40 @@ public void testMaxRetriesByOzoneClient() throws Exception {
Pipeline pipeline = Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager() cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID()); .getPipeline(container.getPipelineID());
pipelineList.add(pipeline.getId()); XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
if (!containerList.contains(containerID)) {
xceiverClient.sendCommand(ContainerTestHelper
.getCreateContainerRequest(containerID, pipeline));
}
xceiverClientManager.releaseClient(xceiverClient, false);
} }
ContainerTestHelper.waitForPipelineClose(key, cluster, false); key.write(data1);
OutputStream stream = entries.get(0).getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
ContainerTestHelper.waitForContainerClose(key, cluster);
try { try {
key.write(data1); key.write(data1);
Assert.fail("Expected exception not thrown");
} catch (IOException ioe) { } catch (IOException ioe) {
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof GroupMismatchException); .getIoException()) instanceof ContainerNotOpenException);
Assert.assertTrue(ioe.getMessage().contains( Assert.assertTrue(ioe.getMessage().contains(
"Retry request failed. retries get failed due to exceeded maximum " "Retry request failed. retries get failed due to exceeded maximum "
+ "allowed retries number: 3")); + "allowed retries number: 3"));
} }
try {
key.flush();
Assert.fail("Expected exception not thrown");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains("Stream is closed"));
}
try {
key.close();
} catch (IOException ioe) {
Assert.fail("Expected should not be thrown");
}
} }
private OzoneOutputStream createKey(String keyName, ReplicationType type, private OzoneOutputStream createKey(String keyName, ReplicationType type,

View File

@ -0,0 +1,512 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.*;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
* This class verifies the watchForCommit Handling by client.
*/
public class TestWatchForCommit {
private MiniOzoneCluster cluster;
private OzoneConfiguration conf;
private OzoneClient client;
private ObjectStore objectStore;
private String volumeName;
private String bucketName;
private String keyString;
private int chunkSize;
private int flushSize;
private int maxFlushSize;
private int blockSize;
private StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static String containerOwner = "OZONE";
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
private void startCluster(OzoneConfiguration conf) throws Exception {
chunkSize = 100;
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
1, TimeUnit.SECONDS);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
volumeName = "watchforcommithandlingtest";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
storageContainerLocationClient = cluster
.getStorageContainerLocationClient();
}
/**
* Shutdown MiniDFSCluster.
*/
private void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
private String getKeyName() {
return UUID.randomUUID().toString();
}
@Test
public void testWatchForCommitWithKeyWrite() throws Exception {
// in this case, watch request should fail with RaftRetryFailureException
// and will be captured in keyOutputStream and the failover will happen
// to a different block
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 10,
TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
startCluster(conf);
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes atleast putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data more than flush Size(2 chunks), at this time
// buffer pool will have 3 buffers allocated worth of chunk size
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// acked by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast one entry from the map where each
// entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
Pipeline pipeline = raftClient.getPipeline();
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
// again write data with more than max buffer limit. This will call
// watchForCommit again. Since the commit will happen 2 way, the
// commitInfoMap will get updated for servers which are alive
// 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
// once exception is hit
key.write(data1);
// As a part of handling the exception, 4 failed writeChunks will be
// rewritten plus one partial chunk plus two putBlocks for flushSize
// and one flush for partial chunk
key.flush();
Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
.getIoException()) instanceof RaftRetryFailureException);
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 14,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 8,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
shutdown();
}
@Test
public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
startCluster(conf);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
containerOwner);
XceiverClientSpi client = clientManager
.acquireClient(container1.getPipeline());
Assert.assertEquals(1, client.getRefcount());
Assert.assertEquals(container1.getPipeline(),
client.getPipeline());
Pipeline pipeline = client.getPipeline();
XceiverClientReply reply =
client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
container1.getContainerInfo().getContainerID(),
client.getPipeline()));
reply.getResponse().get();
long index = reply.getLogIndex();
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
try {
// just watch for a lo index which in not updated in the commitInfo Map
client.watchForCommit(index + 1, 3000);
Assert.fail("expected exception not thrown");
} catch (Exception e) {
Assert.assertTrue(
HddsClientUtils.checkForException(e) instanceof TimeoutException);
}
// After releasing the client, this connection should be closed
// and any container operations should fail
clientManager.releaseClient(client, false);
shutdown();
}
@Test
public void testWatchForCommitForRetryfailure() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3);
startCluster(conf);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
containerOwner);
XceiverClientSpi client = clientManager
.acquireClient(container1.getPipeline());
Assert.assertEquals(1, client.getRefcount());
Assert.assertEquals(container1.getPipeline(),
client.getPipeline());
Pipeline pipeline = client.getPipeline();
XceiverClientReply reply =
client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
container1.getContainerInfo().getContainerID(),
client.getPipeline()));
reply.getResponse().get();
long index = reply.getLogIndex();
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
// again write data with more than max buffer limit. This wi
try {
// just watch for a lo index which in not updated in the commitInfo Map
client.watchForCommit(index + 1, 20000);
Assert.fail("expected exception not thrown");
} catch (Exception e) {
Assert.assertTrue(HddsClientUtils
.checkForException(e) instanceof RaftRetryFailureException);
}
clientManager.releaseClient(client, false);
shutdown();
}
@Test
public void test2WayCommitForRetryfailure() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3);
startCluster(conf);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
containerOwner);
XceiverClientSpi client = clientManager
.acquireClient(container1.getPipeline());
Assert.assertEquals(1, client.getRefcount());
Assert.assertEquals(container1.getPipeline(),
client.getPipeline());
Pipeline pipeline = client.getPipeline();
XceiverClientRatis ratisClient = (XceiverClientRatis) client;
XceiverClientReply reply =
client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
container1.getContainerInfo().getContainerID(),
client.getPipeline()));
reply.getResponse().get();
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
reply = client.sendCommandAsync(ContainerTestHelper
.getCloseContainer(pipeline,
container1.getContainerInfo().getContainerID()));
reply.getResponse().get();
client.watchForCommit(reply.getLogIndex(), 20000);
// commitInfo Map will be reduced to 2 here
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
clientManager.releaseClient(client, false);
Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
Assert.assertTrue(
logCapturer.getOutput().contains("RaftRetryFailureException"));
Assert
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
logCapturer.stopCapturing();
shutdown();
}
@Test
public void test2WayCommitForTimeoutException() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
startCluster(conf);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
containerOwner);
XceiverClientSpi client = clientManager
.acquireClient(container1.getPipeline());
Assert.assertEquals(1, client.getRefcount());
Assert.assertEquals(container1.getPipeline(),
client.getPipeline());
Pipeline pipeline = client.getPipeline();
XceiverClientRatis ratisClient = (XceiverClientRatis) client;
XceiverClientReply reply =
client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
container1.getContainerInfo().getContainerID(),
client.getPipeline()));
reply.getResponse().get();
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
reply = client.sendCommandAsync(ContainerTestHelper
.getCloseContainer(pipeline,
container1.getContainerInfo().getContainerID()));
reply.getResponse().get();
client.watchForCommit(reply.getLogIndex(), 3000);
// commitInfo Map will be reduced to 2 here
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
clientManager.releaseClient(client, false);
Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException"));
Assert
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
logCapturer.stopCapturing();
shutdown();
}
@Test
public void testWatchForCommitForGroupMismatchException() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
// mark the node stale early so that pipeline gets destroyed quickly
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
startCluster(conf);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
containerOwner);
XceiverClientSpi client = clientManager
.acquireClient(container1.getPipeline());
Assert.assertEquals(1, client.getRefcount());
Assert.assertEquals(container1.getPipeline(),
client.getPipeline());
Pipeline pipeline = client.getPipeline();
XceiverClientRatis ratisClient = (XceiverClientRatis) client;
long containerId = container1.getContainerInfo().getContainerID();
XceiverClientReply reply = client.sendCommandAsync(ContainerTestHelper
.getCreateContainerRequest(containerId, client.getPipeline()));
reply.getResponse().get();
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
List<Pipeline> pipelineList = new ArrayList<>();
pipelineList.add(pipeline);
ContainerTestHelper.waitForPipelineClose(pipelineList, cluster);
try {
// just watch for a lo index which in not updated in the commitInfo Map
//client.watchForCommit(reply.getLogIndex() + 1, 20000);
reply = client.sendCommandAsync(ContainerTestHelper
.getCreateContainerRequest(containerId, client.getPipeline()));
reply.getResponse().get();
Assert.fail("Expected exception not thrown");
} catch(Exception e) {
Assert.assertTrue(HddsClientUtils
.checkForException(e) instanceof GroupMismatchException);
}
clientManager.releaseClient(client, false);
shutdown();
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
return ContainerTestHelper
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}
private void validateData(String keyName, byte[] data) throws Exception {
ContainerTestHelper
.validateData(keyName, data, objectStore, volumeName, bucketName);
}
}

View File

@ -739,7 +739,9 @@ public static void waitForContainerClose(OzoneOutputStream outputStream,
keyOutputStream.getLocationInfoList(); keyOutputStream.getLocationInfoList();
List<Long> containerIdList = new ArrayList<>(); List<Long> containerIdList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) { for (OmKeyLocationInfo info : locationInfoList) {
containerIdList.add(info.getContainerID()); long id = info.getContainerID();
if (!containerIdList.contains(id))
containerIdList.add(id);
} }
Assert.assertTrue(!containerIdList.isEmpty()); Assert.assertTrue(!containerIdList.isEmpty());
waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));