HDDS-1337. Handle GroupMismatchException in OzoneClient. Contributed by Shashikant Banerjee.

This commit is contained in:
Yiqun Lin 2019-04-02 16:27:11 +08:00
parent 61d19110d4
commit d31c86892e
13 changed files with 359 additions and 43 deletions

View File

@ -55,6 +55,7 @@
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
@ -100,7 +101,7 @@ public class BlockOutputStream extends OutputStream {
// The IOException will be set by response handling thread in case there is an // The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next // exception received in the response. If the exception is set, the next
// request will fail upfront. // request will fail upfront.
private IOException ioException; private AtomicReference<IOException> ioException;
private ExecutorService responseExecutor; private ExecutorService responseExecutor;
// the effective length of data flushed so far // the effective length of data flushed so far
@ -187,6 +188,7 @@ public BlockOutputStream(BlockID blockID, String key,
writtenDataLength = 0; writtenDataLength = 0;
failedServers = Collections.emptyList(); failedServers = Collections.emptyList();
bufferList = null; bufferList = null;
ioException = new AtomicReference<>(null);
} }
@ -221,9 +223,8 @@ public BufferPool getBufferPool() {
return bufferPool; return bufferPool;
} }
@VisibleForTesting
public IOException getIoException() { public IOException getIoException() {
return ioException; return ioException.get();
} }
@VisibleForTesting @VisibleForTesting
@ -372,10 +373,9 @@ private void handleFullBuffer() throws IOException {
waitOnFlushFutures(); waitOnFlushFutures();
} }
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
ioException = new IOException( setIoException(e);
"Unexpected Storage Container Exception: " + e.toString(), e);
adjustBuffersOnException(); adjustBuffersOnException();
throw ioException; throw getIoException();
} }
if (!commitIndex2flushedDataMap.isEmpty()) { if (!commitIndex2flushedDataMap.isEmpty()) {
watchForCommit( watchForCommit(
@ -430,9 +430,9 @@ private void watchForCommit(long commitIndex) throws IOException {
adjustBuffers(index); adjustBuffers(index);
} catch (TimeoutException | InterruptedException | ExecutionException e) { } catch (TimeoutException | InterruptedException | ExecutionException e) {
LOG.warn("watchForCommit failed for index " + commitIndex, e); LOG.warn("watchForCommit failed for index " + commitIndex, e);
setIoException(e);
adjustBuffersOnException(); adjustBuffersOnException();
throw new IOException( throw getIoException();
"Unexpected Storage Container Exception: " + e.toString(), e);
} }
} }
@ -461,7 +461,7 @@ ContainerCommandResponseProto> executePutBlock()
throw new CompletionException(sce); throw new CompletionException(sce);
} }
// if the ioException is not set, putBlock is successful // if the ioException is not set, putBlock is successful
if (ioException == null) { if (getIoException() == null) {
BlockID responseBlockID = BlockID.getFromProtobuf( BlockID responseBlockID = BlockID.getFromProtobuf(
e.getPutBlock().getCommittedBlockLength().getBlockID()); e.getPutBlock().getCommittedBlockLength().getBlockID());
Preconditions.checkState(blockID.getContainerBlockID() Preconditions.checkState(blockID.getContainerBlockID()
@ -505,10 +505,9 @@ public void flush() throws IOException {
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
// just set the exception here as well in order to maintain sanctity of // just set the exception here as well in order to maintain sanctity of
// ioException field // ioException field
ioException = new IOException( setIoException(e);
"Unexpected Storage Container Exception: " + e.toString(), e);
adjustBuffersOnException(); adjustBuffersOnException();
throw ioException; throw getIoException();
} }
} }
} }
@ -580,10 +579,9 @@ public void close() throws IOException {
try { try {
handleFlush(); handleFlush();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
ioException = new IOException( setIoException(e);
"Unexpected Storage Container Exception: " + e.toString(), e);
adjustBuffersOnException(); adjustBuffersOnException();
throw ioException; throw getIoException();
} finally { } finally {
cleanup(false); cleanup(false);
} }
@ -611,8 +609,9 @@ private void validateResponse(
// if the ioException is already set, it means a prev request has failed // if the ioException is already set, it means a prev request has failed
// just throw the exception. The current operation will fail with the // just throw the exception. The current operation will fail with the
// original error // original error
if (ioException != null) { IOException exception = getIoException();
throw ioException; if (exception != null) {
throw exception;
} }
ContainerProtocolCalls.validateContainerResponse(responseProto); ContainerProtocolCalls.validateContainerResponse(responseProto);
} catch (StorageContainerException sce) { } catch (StorageContainerException sce) {
@ -622,10 +621,12 @@ private void validateResponse(
} }
} }
private void setIoException(Exception e) { private void setIoException(Exception e) {
if (ioException != null) { if (getIoException() == null) {
ioException = new IOException( IOException exception = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e); "Unexpected Storage Container Exception: " + e.toString(), e);
ioException.compareAndSet(null, exception);
} }
} }
@ -659,9 +660,9 @@ public void cleanup(boolean invalidateClient) {
private void checkOpen() throws IOException { private void checkOpen() throws IOException {
if (xceiverClient == null) { if (xceiverClient == null) {
throw new IOException("BlockOutputStream has been closed."); throw new IOException("BlockOutputStream has been closed.");
} else if (ioException != null) { } else if (getIoException() != null) {
adjustBuffersOnException(); adjustBuffersOnException();
throw ioException; throw getIoException();
} }
} }

View File

@ -572,6 +572,17 @@ public List<PipelineReport> getPipelineReport() {
} }
} }
@VisibleForTesting
public List<PipelineID> getPipelineIds() {
Iterable<RaftGroupId> gids = server.getGroupIds();
List<PipelineID> pipelineIDs = new ArrayList<>();
for (RaftGroupId groupId : gids) {
pipelineIDs.add(PipelineID.valueOf(groupId.getUuid()));
LOG.info("pipeline id {}", PipelineID.valueOf(groupId.getUuid()));
}
return pipelineIDs;
}
void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) { void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
handlePipelineFailure(group.getGroupId(), roleInfoProto); handlePipelineFailure(group.getGroupId(), roleInfoProto);
} }

View File

@ -46,7 +46,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hdds.version>0.5.0-SNAPSHOT</hdds.version> <hdds.version>0.5.0-SNAPSHOT</hdds.version>
<!-- Apache Ratis version --> <!-- Apache Ratis version -->
<ratis.version>0.4.0-1fc5ace-SNAPSHOT</ratis.version> <ratis.version>0.4.0-8fed368-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version> <bouncycastle.version>1.60</bouncycastle.version>

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.response.*; import org.apache.hadoop.ozone.client.rest.response.*;
import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.protocol.RaftRetryFailureException;
import java.util.ArrayList; import java.util.ArrayList;
@ -43,6 +44,7 @@ private OzoneClientUtils() {}
add(ContainerNotOpenException.class); add(ContainerNotOpenException.class);
add(RaftRetryFailureException.class); add(RaftRetryFailureException.class);
add(AlreadyClosedException.class); add(AlreadyClosedException.class);
add(GroupMismatchException.class);
}}; }};
/** /**
* Returns a BucketInfo object constructed using fields of the input * Returns a BucketInfo object constructed using fields of the input

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.protocol.RaftRetryFailureException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -423,8 +424,8 @@ private void handleException(BlockOutputStreamEntry streamEntry,
streamEntry.setCurrentPosition(totalSuccessfulFlushedData); streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = computeBufferData(); long bufferedDataLen = computeBufferData();
LOG.warn("Encountered exception {}. The last committed block length is {}, " LOG.warn("Encountered exception {}. The last committed block length is {}, "
+ "uncommitted data length is {}", exception, + "uncommitted data length is {} retry count {}", exception,
totalSuccessfulFlushedData, bufferedDataLen); totalSuccessfulFlushedData, bufferedDataLen, retryCount);
Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen); Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID(); long containerId = streamEntry.getBlockID().getContainerID();
@ -435,7 +436,8 @@ private void handleException(BlockOutputStreamEntry streamEntry,
} }
if (closedContainerException) { if (closedContainerException) {
excludeList.addConatinerId(ContainerID.valueof(containerId)); excludeList.addConatinerId(ContainerID.valueof(containerId));
} else if (retryFailure || t instanceof TimeoutException) { } else if (retryFailure || t instanceof TimeoutException
|| t instanceof GroupMismatchException) {
pipelineId = streamEntry.getPipeline().getId(); pipelineId = streamEntry.getPipeline().getId();
excludeList.addPipeline(pipelineId); excludeList.addPipeline(pipelineId);
} }
@ -482,11 +484,12 @@ private void handleRetry(IOException exception, long len) throws IOException {
throw e instanceof IOException ? (IOException) e : new IOException(e); throw e instanceof IOException ? (IOException) e : new IOException(e);
} }
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
String msg = "";
if (action.reason != null) { if (action.reason != null) {
LOG.error("Retry request failed. " + action.reason, msg = "Retry request failed. " + action.reason;
exception); LOG.error(msg, exception);
} }
throw exception; throw new IOException(msg, exception);
} }
// Throw the exception if the thread is interrupted // Throw the exception if the thread is interrupted

View File

@ -181,6 +181,8 @@ void restartStorageContainerManager() throws InterruptedException,
void restartHddsDatanode(int i, boolean waitForDatanode) void restartHddsDatanode(int i, boolean waitForDatanode)
throws InterruptedException, TimeoutException; throws InterruptedException, TimeoutException;
int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException;
/** /**
* Restart a particular HddsDatanode. * Restart a particular HddsDatanode.
* *

View File

@ -192,7 +192,8 @@ public List<HddsDatanodeService> getHddsDatanodes() {
return hddsDatanodes; return hddsDatanodes;
} }
private int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException { @Override
public int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException {
for (HddsDatanodeService service : hddsDatanodes) { for (HddsDatanodeService service : hddsDatanodes) {
if (service.getDatanodeDetails().equals(dn)) { if (service.getDatanodeDetails().equals(dn)) {
return hddsDatanodes.indexOf(service); return hddsDatanodes.indexOf(service);

View File

@ -24,7 +24,8 @@
import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -35,7 +36,7 @@
import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.RaftRetryFailureException;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -510,7 +511,7 @@ public void test2DatanodesFailure() throws Exception {
// and one flush for partial chunk // and one flush for partial chunk
key.flush(); key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof AlreadyClosedException); .getIoException()) instanceof RaftRetryFailureException);
// Make sure the retryCount is reset after the exception is handled // Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0); Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
@ -1041,7 +1042,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
key.flush(); key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof AlreadyClosedException); .getIoException()) instanceof RaftRetryFailureException);
Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
// Make sure the retryCount is reset after the exception is handled // Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0); Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
@ -1183,7 +1184,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception {
key.flush(); key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof AlreadyClosedException); .getIoException()) instanceof RaftRetryFailureException);
// Make sure the retryCount is reset after the exception is handled // Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0); Assert.assertTrue(keyOutputStream.getRetryCount() == 0);

View File

@ -139,7 +139,13 @@ public void testContainerStateMachineFailures() throws Exception {
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
.getContainerPath())); .getContainerPath()));
try {
key.close(); key.close();
Assert.fail();
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains(
"Requested operation not allowed as ContainerState is UNHEALTHY"));
}
// Make sure the container is marked unhealthy // Make sure the container is marked unhealthy
Assert.assertTrue( Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()

View File

@ -142,7 +142,13 @@ public void testContainerStateMachineFailures() throws Exception {
.getContainer().getContainerSet() .getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
.getContainerPath())); .getContainerPath()));
try {
key.close(); key.close();
Assert.fail();
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains(
"Requested operation not allowed as ContainerState is UNHEALTHY"));
}
long containerID = omKeyLocationInfo.getContainerID(); long containerID = omKeyLocationInfo.getContainerID();
// Make sure the container is marked unhealthy // Make sure the container is marked unhealthy

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.ratis.protocol.GroupMismatchException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests failure detection and handling in BlockOutputStream Class.
*/
public class TestOzoneClientRetriesOnException {
private static MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
private OzoneClient client;
private ObjectStore objectStore;
private int chunkSize;
private int flushSize;
private int maxFlushSize;
private int blockSize;
private String volumeName;
private String bucketName;
private String keyString;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
@Before
public void init() throws Exception {
chunkSize = 100;
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s");
conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
volumeName = "testblockoutputstreamwithretries";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
private String getKeyName() {
return UUID.randomUUID().toString();
}
/**
* Shutdown MiniDFSCluster.
*/
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testGroupMismatchExceptionHandling() throws Exception {
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
long containerID =
keyOutputStream.getStreamEntries().get(0).getBlockID().getContainerID();
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager()
.getContainer(ContainerID.valueof(containerID));
Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID());
ContainerTestHelper.waitForPipelineClose(key, cluster, true);
key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof GroupMismatchException);
Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
.contains(pipeline.getId()));
key.close();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
validateData(keyName, data1);
}
@Test
public void testMaxRetriesByOzoneClient() throws Exception {
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 4 * blockSize);
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
List<BlockOutputStreamEntry> entries = keyOutputStream.getStreamEntries();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 4);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
OutputStream stream = entries.get(0).getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
List<PipelineID> pipelineList = new ArrayList<>();
long containerID;
for (BlockOutputStreamEntry entry : entries) {
containerID = entry.getBlockID().getContainerID();
ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager()
.getContainer(ContainerID.valueof(containerID));
Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID());
pipelineList.add(pipeline.getId());
}
ContainerTestHelper.waitForPipelineClose(key, cluster, false);
try {
key.write(data1);
} catch (IOException ioe) {
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof GroupMismatchException);
Assert.assertTrue(ioe.getMessage().contains(
"Retry request failed. retries get failed due to exceeded maximum "
+ "allowed retries number: 3"));
}
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
return ContainerTestHelper
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}
private void validateData(String keyName, byte[] data) throws Exception {
ContainerTestHelper
.validateData(keyName, data, objectStore, volumeName, bucketName);
}
}

View File

@ -66,6 +66,8 @@
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -740,8 +742,76 @@ public static void waitForContainerClose(OzoneOutputStream outputStream,
containerIdList.add(info.getContainerID()); containerIdList.add(info.getContainerID());
} }
Assert.assertTrue(!containerIdList.isEmpty()); Assert.assertTrue(!containerIdList.isEmpty());
ContainerTestHelper waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
.waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); }
public static void waitForPipelineClose(OzoneOutputStream outputStream,
MiniOzoneCluster cluster, boolean waitForContainerCreation)
throws Exception {
KeyOutputStream keyOutputStream =
(KeyOutputStream) outputStream.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
keyOutputStream.getLocationInfoList();
List<Long> containerIdList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
containerIdList.add(info.getContainerID());
}
Assert.assertTrue(!containerIdList.isEmpty());
waitForPipelineClose(cluster, waitForContainerCreation,
containerIdList.toArray(new Long[0]));
}
public static void waitForPipelineClose(MiniOzoneCluster cluster,
boolean waitForContainerCreation, Long... containerIdList)
throws TimeoutException, InterruptedException, IOException {
List<Pipeline> pipelineList = new ArrayList<>();
for (long containerID : containerIdList) {
ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager()
.getContainer(ContainerID.valueof(containerID));
Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager()
.getPipeline(container.getPipelineID());
if (!pipelineList.contains(pipeline)) {
pipelineList.add(pipeline);
}
List<DatanodeDetails> datanodes = pipeline.getNodes();
if (waitForContainerCreation) {
for (DatanodeDetails details : datanodes) {
// Client will issue write chunk and it will create the container on
// datanodes.
// wait for the container to be created
GenericTestUtils
.waitFor(() -> isContainerPresent(cluster, containerID, details),
500, 100 * 1000);
Assert.assertTrue(isContainerPresent(cluster, containerID, details));
// make sure the container gets created first
Assert.assertFalse(ContainerTestHelper
.isContainerClosed(cluster, containerID, details));
}
}
}
for (Pipeline pipeline1 : pipelineList) {
// issue pipeline destroy command
cluster.getStorageContainerManager().getPipelineManager()
.finalizeAndDestroyPipeline(pipeline1, false);
}
// wait for the pipeline to get destroyed in the datanodes
for (Pipeline pipeline : pipelineList) {
for (DatanodeDetails dn : pipeline.getNodes()) {
XceiverServerSpi server =
cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn))
.getDatanodeStateMachine().getContainer().getWriteChannel();
Assert.assertTrue(server instanceof XceiverServerRatis);
XceiverServerRatis raftServer = (XceiverServerRatis) server;
GenericTestUtils.waitFor(
() -> (!raftServer.getPipelineIds().contains(pipeline.getId())),
500, 100 * 1000);
}
}
} }
public static void waitForContainerClose(MiniOzoneCluster cluster, public static void waitForContainerClose(MiniOzoneCluster cluster,
@ -785,13 +855,13 @@ public static void waitForContainerClose(MiniOzoneCluster cluster,
// but not yet been used by the client. In such a case container is never // but not yet been used by the client. In such a case container is never
// created. // created.
for (DatanodeDetails datanodeDetails : datanodes) { for (DatanodeDetails datanodeDetails : datanodes) {
GenericTestUtils.waitFor(() -> ContainerTestHelper GenericTestUtils.waitFor(
.isContainerClosed(cluster, containerID, datanodeDetails), 500, () -> isContainerClosed(cluster, containerID, datanodeDetails), 500,
15 * 1000); 15 * 1000);
//double check if it's really closed //double check if it's really closed
// (waitFor also throws an exception) // (waitFor also throws an exception)
Assert.assertTrue(ContainerTestHelper Assert.assertTrue(
.isContainerClosed(cluster, containerID, datanodeDetails)); isContainerClosed(cluster, containerID, datanodeDetails));
} }
index++; index++;
} }

View File

@ -29,7 +29,7 @@
<hadoop.version>3.2.0</hadoop.version> <hadoop.version>3.2.0</hadoop.version>
<hdds.version>0.5.0-SNAPSHOT</hdds.version> <hdds.version>0.5.0-SNAPSHOT</hdds.version>
<ozone.version>0.5.0-SNAPSHOT</ozone.version> <ozone.version>0.5.0-SNAPSHOT</ozone.version>
<ratis.version>0.4.0-1fc5ace-SNAPSHOT</ratis.version> <ratis.version>0.4.0-8fed368-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version> <bouncycastle.version>1.60</bouncycastle.version>
<ozone.release>Crater Lake</ozone.release> <ozone.release>Crater Lake</ozone.release>
<declared.ozone.version>${ozone.version}</declared.ozone.version> <declared.ozone.version>${ozone.version}</declared.ozone.version>