HDDS-1317. KeyOutputStream#write throws ArrayIndexOutOfBoundsException when running RandomWrite MR examples. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2019-03-25 15:41:20 +05:30
parent 128dd91e10
commit d4e4a7d456
14 changed files with 1543 additions and 136 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdds.scm; package org.apache.hadoop.hdds.scm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsSystem;
@ -37,7 +38,9 @@ public class XceiverClientMetrics {
.getSimpleName(); .getSimpleName();
private @Metric MutableCounterLong pendingOps; private @Metric MutableCounterLong pendingOps;
private @Metric MutableCounterLong totalOps;
private MutableCounterLong[] pendingOpsArray; private MutableCounterLong[] pendingOpsArray;
private MutableCounterLong[] opsArray;
private MutableRate[] containerOpsLatency; private MutableRate[] containerOpsLatency;
private MetricsRegistry registry; private MetricsRegistry registry;
@ -46,12 +49,17 @@ public XceiverClientMetrics() {
this.registry = new MetricsRegistry(SOURCE_NAME); this.registry = new MetricsRegistry(SOURCE_NAME);
this.pendingOpsArray = new MutableCounterLong[numEnumEntries]; this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
this.opsArray = new MutableCounterLong[numEnumEntries];
this.containerOpsLatency = new MutableRate[numEnumEntries]; this.containerOpsLatency = new MutableRate[numEnumEntries];
for (int i = 0; i < numEnumEntries; i++) { for (int i = 0; i < numEnumEntries; i++) {
pendingOpsArray[i] = registry.newCounter( pendingOpsArray[i] = registry.newCounter(
"numPending" + ContainerProtos.Type.forNumber(i + 1), "numPending" + ContainerProtos.Type.forNumber(i + 1),
"number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops", "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0); (long) 0);
opsArray[i] = registry
.newCounter("opCount" + ContainerProtos.Type.forNumber(i + 1),
"number of" + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
containerOpsLatency[i] = registry.newRate( containerOpsLatency[i] = registry.newRate(
ContainerProtos.Type.forNumber(i + 1) + "Latency", ContainerProtos.Type.forNumber(i + 1) + "Latency",
@ -68,6 +76,8 @@ public static XceiverClientMetrics create() {
public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) { public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
pendingOps.incr(); pendingOps.incr();
totalOps.incr();
opsArray[type.ordinal()].incr();
pendingOpsArray[type.ordinal()].incr(); pendingOpsArray[type.ordinal()].incr();
} }
@ -85,6 +95,16 @@ public long getContainerOpsMetrics(ContainerProtos.Type type) {
return pendingOpsArray[type.ordinal()].value(); return pendingOpsArray[type.ordinal()].value();
} }
@VisibleForTesting
public long getTotalOpCount() {
return totalOps.value();
}
@VisibleForTesting
public long getContainerOpCountMetrics(ContainerProtos.Type type) {
return opsArray[type.ordinal()].value();
}
public void unRegister() { public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance(); MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME); ms.unregisterSource(SOURCE_NAME);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm; package org.apache.hadoop.hdds.scm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@ -25,6 +26,7 @@
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.protocol.RaftRetryFailureException;
@ -101,6 +103,8 @@ public static XceiverClientRatis newXceiverClientRatis(
// create a separate RaftClient for watchForCommit API // create a separate RaftClient for watchForCommit API
private RaftClient watchClient; private RaftClient watchClient;
private XceiverClientMetrics metrics;
/** /**
* Constructs a client. * Constructs a client.
*/ */
@ -116,6 +120,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
watchClient = null; watchClient = null;
this.tlsConfig = tlsConfig; this.tlsConfig = tlsConfig;
this.clientRequestTimeout = timeout; this.clientRequestTimeout = timeout;
metrics = XceiverClientManager.getXceiverClientMetrics();
} }
private void updateCommitInfosMap( private void updateCommitInfosMap(
@ -199,6 +204,12 @@ private RaftClient getClient() {
return Objects.requireNonNull(client.get(), "client is null"); return Objects.requireNonNull(client.get(), "client is null");
} }
@VisibleForTesting
public ConcurrentHashMap<UUID, Long> getCommitInfoMap() {
return commitInfoMap;
}
private CompletableFuture<RaftClientReply> sendRequestAsync( private CompletableFuture<RaftClientReply> sendRequestAsync(
ContainerCommandRequestProto request) { ContainerCommandRequestProto request) {
try (Scope scope = GlobalTracer.get() try (Scope scope = GlobalTracer.get()
@ -301,47 +312,52 @@ public XceiverClientReply watchForCommit(long index, long timeout)
public XceiverClientReply sendCommandAsync( public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request) { ContainerCommandRequestProto request) {
XceiverClientReply asyncReply = new XceiverClientReply(null); XceiverClientReply asyncReply = new XceiverClientReply(null);
long requestTime = Time.monotonicNowNanos();
CompletableFuture<RaftClientReply> raftClientReply = CompletableFuture<RaftClientReply> raftClientReply =
sendRequestAsync(request); sendRequestAsync(request);
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse = CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
raftClientReply.whenComplete((reply, e) -> LOG.debug( raftClientReply.whenComplete((reply, e) -> {
"received reply {} for request: cmdType={} containerID={}" LOG.debug("received reply {} for request: cmdType={} containerID={}"
+ " pipelineID={} traceID={} exception: {}", reply, + " pipelineID={} traceID={} exception: {}", reply,
request.getCmdType(), request.getContainerID(), request.getCmdType(), request.getContainerID(),
request.getPipelineID(), request.getTraceID(), e)) request.getPipelineID(), request.getTraceID(), e);
.thenApply(reply -> { metrics.decrPendingContainerOpsMetrics(request.getCmdType());
try { metrics.addContainerOpsLatency(request.getCmdType(),
// we need to handle RaftRetryFailure Exception Time.monotonicNowNanos() - requestTime);
RaftRetryFailureException raftRetryFailureException = }).thenApply(reply -> {
reply.getRetryFailureException(); try {
if (raftRetryFailureException != null) { // we need to handle RaftRetryFailure Exception
// in case of raft retry failure, the raft client is RaftRetryFailureException raftRetryFailureException =
// not able to connect to the leader hence the pipeline reply.getRetryFailureException();
// can not be used but this instance of RaftClient will close if (raftRetryFailureException != null) {
// and refreshed again. In case the client cannot connect to // in case of raft retry failure, the raft client is
// leader, getClient call will fail. // not able to connect to the leader hence the pipeline
// can not be used but this instance of RaftClient will close
// and refreshed again. In case the client cannot connect to
// leader, getClient call will fail.
// No need to set the failed Server ID here. Ozone client // No need to set the failed Server ID here. Ozone client
// will directly exclude this pipeline in next allocate block // will directly exclude this pipeline in next allocate block
// to SCM as in this case, it is the raft client which is not // to SCM as in this case, it is the raft client which is not
// able to connect to leader in the pipeline, though the // able to connect to leader in the pipeline, though the
// pipeline can still be functional. // pipeline can still be functional.
throw new CompletionException(raftRetryFailureException); throw new CompletionException(raftRetryFailureException);
} }
ContainerCommandResponseProto response = ContainerCommandResponseProto response =
ContainerCommandResponseProto ContainerCommandResponseProto
.parseFrom(reply.getMessage().getContent()); .parseFrom(reply.getMessage().getContent());
UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId()); UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
if (response.getResult() == ContainerProtos.Result.SUCCESS) { if (response.getResult() == ContainerProtos.Result.SUCCESS) {
updateCommitInfosMap(reply.getCommitInfos()); updateCommitInfosMap(reply.getCommitInfos());
} }
asyncReply.setLogIndex(reply.getLogIndex()); asyncReply.setLogIndex(reply.getLogIndex());
addDatanodetoReply(serverId, asyncReply); addDatanodetoReply(serverId, asyncReply);
return response; return response;
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
throw new CompletionException(e); throw new CompletionException(e);
} }
}); });
asyncReply.setResponse(containerCommandResponse); asyncReply.setResponse(containerCommandResponse);
return asyncReply; return asyncReply;
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdds.scm.storage; package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@ -45,7 +46,15 @@
import java.util.UUID; import java.util.UUID;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.*; import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
@ -104,15 +113,25 @@ public class BlockOutputStream extends OutputStream {
// by all servers // by all servers
private long totalAckDataLength; private long totalAckDataLength;
// List containing buffers for which the putBlock call will
// update the length in the datanodes. This list will just maintain
// references to the buffers in the BufferPool which will be cleared
// when the watchForCommit acknowledges a putBlock logIndex has been
// committed on all datanodes. This list will be a place holder for buffers
// which got written between successive putBlock calls.
private List<ByteBuffer> bufferList;
// future Map to hold up all putBlock futures // future Map to hold up all putBlock futures
private ConcurrentHashMap<Long, private ConcurrentHashMap<Long,
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>> CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
futureMap; futureMap;
// map containing mapping for putBlock logIndex to to flushedDataLength Map.
// The map should maintain the keys (logIndexes) in order so that while // The map should maintain the keys (logIndexes) in order so that while
// removing we always end up updating incremented data flushed length. // removing we always end up updating incremented data flushed length.
private ConcurrentSkipListMap<Long, Long> commitIndex2flushedDataMap; // Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private ConcurrentSkipListMap<Long, List<ByteBuffer>>
commitIndex2flushedDataMap;
private List<DatanodeDetails> failedServers; private List<DatanodeDetails> failedServers;
@ -167,13 +186,15 @@ public BlockOutputStream(BlockID blockID, String key,
totalDataFlushedLength = 0; totalDataFlushedLength = 0;
writtenDataLength = 0; writtenDataLength = 0;
failedServers = Collections.emptyList(); failedServers = Collections.emptyList();
bufferList = null;
} }
public BlockID getBlockID() { public BlockID getBlockID() {
return blockID; return blockID;
} }
public long getTotalSuccessfulFlushedData() { public long getTotalAckDataLength() {
return totalAckDataLength; return totalAckDataLength;
} }
@ -185,6 +206,31 @@ public List<DatanodeDetails> getFailedServers() {
return failedServers; return failedServers;
} }
@VisibleForTesting
public XceiverClientSpi getXceiverClient() {
return xceiverClient;
}
@VisibleForTesting
public long getTotalDataFlushedLength() {
return totalDataFlushedLength;
}
@VisibleForTesting
public BufferPool getBufferPool() {
return bufferPool;
}
@VisibleForTesting
public IOException getIoException() {
return ioException;
}
@VisibleForTesting
public Map<Long, List<ByteBuffer>> getCommitIndex2flushedDataMap() {
return commitIndex2flushedDataMap;
}
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
checkOpen(); checkOpen();
@ -206,9 +252,9 @@ public void write(byte[] b, int off, int len) throws IOException {
if (len == 0) { if (len == 0) {
return; return;
} }
while (len > 0) { while (len > 0) {
int writeLen; int writeLen;
// Allocate a buffer if needed. The buffer will be allocated only // Allocate a buffer if needed. The buffer will be allocated only
// once as needed and will be reused again for multiple blockOutputStream // once as needed and will be reused again for multiple blockOutputStream
// entries. // entries.
@ -224,8 +270,8 @@ public void write(byte[] b, int off, int len) throws IOException {
len -= writeLen; len -= writeLen;
writtenDataLength += writeLen; writtenDataLength += writeLen;
if (shouldFlush()) { if (shouldFlush()) {
totalDataFlushedLength += streamBufferFlushSize; updateFlushLength();
handlePartialFlush(); executePutBlock();
} }
// Data in the bufferPool can not exceed streamBufferMaxSize // Data in the bufferPool can not exceed streamBufferMaxSize
if (isBufferPoolFull()) { if (isBufferPoolFull()) {
@ -235,7 +281,11 @@ public void write(byte[] b, int off, int len) throws IOException {
} }
private boolean shouldFlush() { private boolean shouldFlush() {
return writtenDataLength % streamBufferFlushSize == 0; return bufferPool.computeBufferData() % streamBufferFlushSize == 0;
}
private void updateFlushLength() {
totalDataFlushedLength += writtenDataLength - totalDataFlushedLength;
} }
private boolean isBufferPoolFull() { private boolean isBufferPoolFull() {
@ -264,17 +314,17 @@ public void writeOnRetry(long len) throws IOException {
len -= writeLen; len -= writeLen;
count++; count++;
writtenDataLength += writeLen; writtenDataLength += writeLen;
if (shouldFlush()) { // we should not call isBufferFull/shouldFlush here.
// The buffer might already be full as whole data is already cached in
// the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
// call for handling full buffer/flush buffer condition.
if (writtenDataLength % streamBufferFlushSize == 0) {
// reset the position to zero as now we will be reading the // reset the position to zero as now we will be reading the
// next buffer in the list // next buffer in the list
totalDataFlushedLength += streamBufferFlushSize; updateFlushLength();
handlePartialFlush(); executePutBlock();
} }
// we should not call isBufferFull here. The buffer might already be full
// as whole data is already cached in the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize to call for handling
// full buffer condition.
if (writtenDataLength == streamBufferMaxSize) { if (writtenDataLength == streamBufferMaxSize) {
handleFullBuffer(); handleFullBuffer();
} }
@ -289,25 +339,22 @@ private void updateFlushIndex(List<Long> indexes) {
Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty()); Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
for (long index : indexes) { for (long index : indexes) {
Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index)); Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
long length = commitIndex2flushedDataMap.remove(index); List<ByteBuffer> buffers = commitIndex2flushedDataMap.remove(index);
long length = buffers.stream().mapToLong(value -> {
// totalAckDataLength replicated yet should always be less than equal to int pos = value.position();
// the current length being returned from commitIndex2flushedDataMap. Preconditions.checkArgument(pos <= chunkSize);
// The below precondition would ensure commitIndex2flushedDataMap entries return pos;
// are removed in order of the insertion to the map. }).sum();
Preconditions.checkArgument(totalAckDataLength < length); // totalAckDataLength replicated yet should always be incremented
totalAckDataLength = length; // with the current length being returned from commitIndex2flushedDataMap.
totalAckDataLength += length;
LOG.debug("Total data successfully replicated: " + totalAckDataLength); LOG.debug("Total data successfully replicated: " + totalAckDataLength);
futureMap.remove(totalAckDataLength); futureMap.remove(totalAckDataLength);
// Flush has been committed to required servers successful. // Flush has been committed to required servers successful.
// just release the current buffer from the buffer pool. // just release the current buffer from the buffer pool corresponding
// to the buffers that have been committed with the putBlock call.
// every entry removed from the putBlock future Map signifies for (ByteBuffer byteBuffer : buffers) {
// streamBufferFlushSize/chunkSize no of chunks successfully committed. bufferPool.releaseBuffer(byteBuffer);
// Release the buffers from the buffer pool to be reused again.
int chunkCount = (int) (streamBufferFlushSize / chunkSize);
for (int i = 0; i < chunkCount; i++) {
bufferPool.releaseBuffer();
} }
} }
} }
@ -325,9 +372,10 @@ private void handleFullBuffer() throws IOException {
waitOnFlushFutures(); waitOnFlushFutures();
} }
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
adjustBuffersOnException(); ioException = new IOException(
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e); "Unexpected Storage Container Exception: " + e.toString(), e);
adjustBuffersOnException();
throw ioException;
} }
if (!commitIndex2flushedDataMap.isEmpty()) { if (!commitIndex2flushedDataMap.isEmpty()) {
watchForCommit( watchForCommit(
@ -389,10 +437,14 @@ private void watchForCommit(long commitIndex) throws IOException {
} }
private CompletableFuture<ContainerProtos. private CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> handlePartialFlush() ContainerCommandResponseProto> executePutBlock()
throws IOException { throws IOException {
checkOpen(); checkOpen();
long flushPos = totalDataFlushedLength; long flushPos = totalDataFlushedLength;
Preconditions.checkNotNull(bufferList);
List<ByteBuffer> byteBufferList = bufferList;
bufferList = null;
Preconditions.checkNotNull(byteBufferList);
String requestId = String requestId =
traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID; traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID;
CompletableFuture<ContainerProtos. CompletableFuture<ContainerProtos.
@ -410,17 +462,22 @@ ContainerCommandResponseProto> handlePartialFlush()
} }
// if the ioException is not set, putBlock is successful // if the ioException is not set, putBlock is successful
if (ioException == null) { if (ioException == null) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ commitIndex2flushedDataMap.size());
BlockID responseBlockID = BlockID.getFromProtobuf( BlockID responseBlockID = BlockID.getFromProtobuf(
e.getPutBlock().getCommittedBlockLength().getBlockID()); e.getPutBlock().getCommittedBlockLength().getBlockID());
Preconditions.checkState(blockID.getContainerBlockID() Preconditions.checkState(blockID.getContainerBlockID()
.equals(responseBlockID.getContainerBlockID())); .equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block // updates the bcsId of the block
blockID = responseBlockID; blockID = responseBlockID;
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ commitIndex2flushedDataMap.size() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
+ " blockID " + blockID + " bufferPool size" + bufferPool
.getSize() + " currentBufferIndex " + bufferPool
.getCurrentBufferIndex());
// for standalone protocol, logIndex will always be 0. // for standalone protocol, logIndex will always be 0.
commitIndex2flushedDataMap.put(asyncReply.getLogIndex(), flushPos); commitIndex2flushedDataMap
.put(asyncReply.getLogIndex(), byteBufferList);
} }
return e; return e;
}, responseExecutor).exceptionally(e -> { }, responseExecutor).exceptionally(e -> {
@ -446,9 +503,12 @@ public void flush() throws IOException {
try { try {
handleFlush(); handleFlush();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
adjustBuffersOnException(); // just set the exception here as well in order to maintain sanctity of
throw new IOException( // ioException field
ioException = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e); "Unexpected Storage Container Exception: " + e.toString(), e);
adjustBuffersOnException();
throw ioException;
} }
} }
} }
@ -456,6 +516,15 @@ public void flush() throws IOException {
private void writeChunk(ByteBuffer buffer) private void writeChunk(ByteBuffer buffer)
throws IOException { throws IOException {
// This data in the buffer will be pushed to datanode and a reference will
// be added to the bufferList. Once putBlock gets executed, this list will
// be marked null. Hence, during first writeChunk call after every putBlock
// call or during the first call to writeChunk here, the list will be null.
if (bufferList == null) {
bufferList = new ArrayList<>();
}
bufferList.add(buffer);
// Please note : We are not flipping the slice when we write since // Please note : We are not flipping the slice when we write since
// the slices are pointing the currentBuffer start and end as needed for // the slices are pointing the currentBuffer start and end as needed for
// the chunk write. Also please note, Duplicate does not create a // the chunk write. Also please note, Duplicate does not create a
@ -472,20 +541,36 @@ private void handleFlush()
checkOpen(); checkOpen();
// flush the last chunk data residing on the currentBuffer // flush the last chunk data residing on the currentBuffer
if (totalDataFlushedLength < writtenDataLength) { if (totalDataFlushedLength < writtenDataLength) {
ByteBuffer currentBuffer = bufferPool.getBuffer(); ByteBuffer currentBuffer = bufferPool.getCurrentBuffer();
int pos = currentBuffer.position(); Preconditions.checkArgument(currentBuffer.position() > 0);
writeChunk(currentBuffer); if (currentBuffer.position() != chunkSize) {
totalDataFlushedLength += pos; writeChunk(currentBuffer);
handlePartialFlush(); }
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
updateFlushLength();
executePutBlock();
} }
waitOnFlushFutures(); waitOnFlushFutures();
if (!commitIndex2flushedDataMap.isEmpty()) {
// wait for the last commit index in the commitIndex2flushedDataMap
// to get committed to all or majority of nodes in case timeout
// happens.
long lastIndex =
commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
.max().getAsLong();
LOG.debug(
"waiting for last flush Index " + lastIndex + " to catch up");
watchForCommit(lastIndex);
}
// just check again if the exception is hit while waiting for the // just check again if the exception is hit while waiting for the
// futures to ensure flush has indeed succeeded // futures to ensure flush has indeed succeeded
// irrespective of whether the commitIndex2flushedDataMap is empty // irrespective of whether the commitIndex2flushedDataMap is empty
// or not, ensure there is no exception set // or not, ensure there is no exception set
checkOpen(); checkOpen();
} }
@Override @Override
@ -494,21 +579,11 @@ public void close() throws IOException {
&& bufferPool != null && bufferPool.getSize() > 0) { && bufferPool != null && bufferPool.getSize() > 0) {
try { try {
handleFlush(); handleFlush();
if (!commitIndex2flushedDataMap.isEmpty()) {
// wait for the last commit index in the commitIndex2flushedDataMap
// to get committed to all or majority of nodes in case timeout
// happens.
long lastIndex =
commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
.max().getAsLong();
LOG.debug(
"waiting for last flush Index " + lastIndex + " to catch up");
watchForCommit(lastIndex);
}
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
adjustBuffersOnException(); ioException = new IOException(
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e); "Unexpected Storage Container Exception: " + e.toString(), e);
adjustBuffersOnException();
throw ioException;
} finally { } finally {
cleanup(false); cleanup(false);
} }
@ -564,6 +639,10 @@ public void cleanup(boolean invalidateClient) {
futureMap.clear(); futureMap.clear();
} }
futureMap = null; futureMap = null;
if (bufferList != null) {
bufferList.clear();
}
bufferList = null;
if (commitIndex2flushedDataMap != null) { if (commitIndex2flushedDataMap != null) {
commitIndex2flushedDataMap.clear(); commitIndex2flushedDataMap.clear();
} }

View File

@ -41,7 +41,7 @@ public BufferPool(int bufferSize, int capacity) {
currentBufferIndex = -1; currentBufferIndex = -1;
} }
public ByteBuffer getBuffer() { public ByteBuffer getCurrentBuffer() {
return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex); return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex);
} }
@ -56,7 +56,7 @@ public ByteBuffer getBuffer() {
* *
*/ */
public ByteBuffer allocateBufferIfNeeded() { public ByteBuffer allocateBufferIfNeeded() {
ByteBuffer buffer = getBuffer(); ByteBuffer buffer = getCurrentBuffer();
if (buffer != null && buffer.hasRemaining()) { if (buffer != null && buffer.hasRemaining()) {
return buffer; return buffer;
} }
@ -74,11 +74,14 @@ public ByteBuffer allocateBufferIfNeeded() {
return buffer; return buffer;
} }
public void releaseBuffer() { public void releaseBuffer(ByteBuffer byteBuffer) {
// always remove from head of the list and append at last // always remove from head of the list and append at last
ByteBuffer buffer = bufferList.remove(0); ByteBuffer buffer = bufferList.remove(0);
// Ensure the buffer to be removed is always at the head of the list.
Preconditions.checkArgument(buffer.equals(byteBuffer));
buffer.clear(); buffer.clear();
bufferList.add(buffer); bufferList.add(buffer);
Preconditions.checkArgument(currentBufferIndex >= 0);
currentBufferIndex--; currentBufferIndex--;
} }
@ -90,6 +93,7 @@ public void clearBufferPool() {
public void checkBufferPoolEmpty() { public void checkBufferPoolEmpty() {
Preconditions.checkArgument(computeBufferData() == 0); Preconditions.checkArgument(computeBufferData() == 0);
} }
public long computeBufferData() { public long computeBufferData() {
return bufferList.stream().mapToInt(value -> value.position()) return bufferList.stream().mapToInt(value -> value.position())
.sum(); .sum();
@ -99,8 +103,12 @@ public int getSize() {
return bufferList.size(); return bufferList.size();
} }
ByteBuffer getBuffer(int index) { public ByteBuffer getBuffer(int index) {
return bufferList.get(index); return bufferList.get(index);
} }
int getCurrentBufferIndex() {
return currentBufferIndex;
}
} }

View File

@ -74,8 +74,8 @@ public void setContainerBlockID(ContainerBlockID containerBlockID) {
@Override @Override
public String toString() { public String toString() {
return new StringBuffer().append(getContainerBlockID().toString()) return new StringBuilder().append(getContainerBlockID().toString())
.append(" bcId: ") .append(" bcsId: ")
.append(blockCommitSequenceId) .append(blockCommitSequenceId)
.toString(); .toString();
} }

View File

@ -20,6 +20,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -148,11 +149,11 @@ public void close() throws IOException {
} }
} }
long getTotalSuccessfulFlushedData() throws IOException { long getTotalAckDataLength() {
if (outputStream != null) { if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream; BlockOutputStream out = (BlockOutputStream) this.outputStream;
blockID = out.getBlockID(); blockID = out.getBlockID();
return out.getTotalSuccessfulFlushedData(); return out.getTotalAckDataLength();
} else { } else {
// For a pre allocated block for which no write has been initiated, // For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here. // the OutputStream will be null here.
@ -295,6 +296,7 @@ public BlockOutputStreamEntry build() {
} }
} }
@VisibleForTesting
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
return outputStream; return outputStream;
} }

View File

@ -64,6 +64,13 @@
*/ */
public class KeyOutputStream extends OutputStream { public class KeyOutputStream extends OutputStream {
/**
* Defines stream action while calling handleFlushOrClose.
*/
enum StreamAction {
FLUSH, CLOSE, FULL
}
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(KeyOutputStream.class); LoggerFactory.getLogger(KeyOutputStream.class);
@ -326,8 +333,7 @@ private void handleWrite(byte[] b, int off, long len, boolean retry)
} }
if (current.getRemaining() <= 0) { if (current.getRemaining() <= 0) {
// since the current block is already written close the stream. // since the current block is already written close the stream.
handleFlushOrClose(true); handleFlushOrClose(StreamAction.FULL);
currentStreamIndex += 1;
} }
len -= writeLen; len -= writeLen;
off += writeLen; off += writeLen;
@ -393,19 +399,21 @@ private void handleException(BlockOutputStreamEntry streamEntry,
boolean retryFailure = checkForRetryFailure(exception); boolean retryFailure = checkForRetryFailure(exception);
boolean closedContainerException = false; boolean closedContainerException = false;
if (!retryFailure) { if (!retryFailure) {
closedContainerException = checkIfContainerIsClosed(exception); closedContainerException = checkIfContainerIsClosed(t);
} }
PipelineID pipelineId = null; PipelineID pipelineId = null;
long totalSuccessfulFlushedData = long totalSuccessfulFlushedData =
streamEntry.getTotalSuccessfulFlushedData(); streamEntry.getTotalAckDataLength();
//set the correct length for the current stream //set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData); streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = computeBufferData(); long bufferedDataLen = computeBufferData();
LOG.warn("Encountered exception {}", exception); LOG.warn("Encountered exception {}. The last committed block length is {}, "
LOG.info( + "uncommitted data length is {}", exception,
"The last committed block length is {}, uncommitted data length is {}",
totalSuccessfulFlushedData, bufferedDataLen); totalSuccessfulFlushedData, bufferedDataLen);
Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
Preconditions.checkArgument(
streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData
== bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID(); long containerId = streamEntry.getBlockID().getContainerID();
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers(); Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
Preconditions.checkNotNull(failedServers); Preconditions.checkNotNull(failedServers);
@ -498,7 +506,7 @@ private boolean checkIfContainerIsClosed(Throwable t) {
return t instanceof ContainerNotOpenException; return t instanceof ContainerNotOpenException;
} }
private Throwable checkForException(IOException ioe) throws IOException { public Throwable checkForException(IOException ioe) throws IOException {
Throwable t = ioe.getCause(); Throwable t = ioe.getCause();
while (t != null) { while (t != null) {
for (Class<? extends Exception> cls : OzoneClientUtils for (Class<? extends Exception> cls : OzoneClientUtils
@ -513,7 +521,7 @@ private Throwable checkForException(IOException ioe) throws IOException {
} }
private long getKeyLength() { private long getKeyLength() {
return streamEntries.parallelStream().mapToLong(e -> e.getCurrentPosition()) return streamEntries.stream().mapToLong(e -> e.getCurrentPosition())
.sum(); .sum();
} }
@ -535,16 +543,24 @@ private void allocateNewBlock(int index) throws IOException {
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
checkNotClosed(); checkNotClosed();
handleFlushOrClose(false); handleFlushOrClose(StreamAction.FLUSH);
} }
/** /**
* Close or Flush the latest outputStream. * Close or Flush the latest outputStream depending upon the action.
* @param close Flag which decides whether to call close or flush on the * This function gets called when while write is going on, the current stream
* gets full or explicit flush or close request is made by client. when the
* stream gets full and we try to close the stream , we might end up hitting
* an exception in the exception handling path, we write the data residing in
* in the buffer pool to a new Block. In cases, as such, when the data gets
* written to new stream , it will be at max half full. In such cases, we
* should just write the data and not close the stream as the block won't be
* completely full.
* @param op Flag which decides whether to call close or flush on the
* outputStream. * outputStream.
* @throws IOException In case, flush or close fails with exception. * @throws IOException In case, flush or close fails with exception.
*/ */
private void handleFlushOrClose(boolean close) throws IOException { private void handleFlushOrClose(StreamAction op) throws IOException {
if (streamEntries.size() == 0) { if (streamEntries.size() == 0) {
return; return;
} }
@ -561,10 +577,21 @@ private void handleFlushOrClose(boolean close) throws IOException {
if (failedServers != null && !failedServers.isEmpty()) { if (failedServers != null && !failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers); excludeList.addDatanodes(failedServers);
} }
if (close) { switch (op) {
case CLOSE:
entry.close(); entry.close();
} else { break;
case FULL:
if (entry.getRemaining() == 0) {
entry.close();
currentStreamIndex++;
}
break;
case FLUSH:
entry.flush(); entry.flush();
break;
default:
throw new IOException("Invalid Operation");
} }
} catch (IOException ioe) { } catch (IOException ioe) {
handleException(entry, streamIndex, ioe); handleException(entry, streamIndex, ioe);
@ -587,7 +614,7 @@ public void close() throws IOException {
} }
closed = true; closed = true;
try { try {
handleFlushOrClose(true); handleFlushOrClose(StreamAction.CLOSE);
if (keyArgs != null) { if (keyArgs != null) {
// in test, this could be null // in test, this could be null
removeEmptyBlocks(); removeEmptyBlocks();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone; package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@ -249,6 +250,7 @@ abstract class Builder {
protected Optional<Long> streamBufferFlushSize = Optional.empty(); protected Optional<Long> streamBufferFlushSize = Optional.empty();
protected Optional<Long> streamBufferMaxSize = Optional.empty(); protected Optional<Long> streamBufferMaxSize = Optional.empty();
protected Optional<Long> blockSize = Optional.empty(); protected Optional<Long> blockSize = Optional.empty();
protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
// Use relative smaller number of handlers for testing // Use relative smaller number of handlers for testing
protected int numOfOmHandlers = 20; protected int numOfOmHandlers = 20;
protected int numOfScmHandlers = 20; protected int numOfScmHandlers = 20;
@ -434,6 +436,11 @@ public Builder setNumOfOzoneManagers(int numOMs) {
return this; return this;
} }
public Builder setStreamBufferSizeUnit(StorageUnit unit) {
this.streamBufferSizeUnit = Optional.of(unit);
return this;
}
public Builder setOMServiceId(String serviceId) { public Builder setOMServiceId(String serviceId) {
this.omServiceId = serviceId; this.omServiceId = serviceId;
return this; return this;

View File

@ -446,14 +446,18 @@ void initializeConfiguration() throws IOException {
if (!blockSize.isPresent()) { if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get()); blockSize = Optional.of(2 * streamBufferMaxSize.get());
} }
if (!streamBufferSizeUnit.isPresent()) {
streamBufferSizeUnit = Optional.of(StorageUnit.MB);
}
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
chunkSize.get(), StorageUnit.MB); chunkSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
streamBufferFlushSize.get(), StorageUnit.MB); streamBufferFlushSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
streamBufferMaxSize.get(), StorageUnit.MB); streamBufferMaxSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(), conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
StorageUnit.MB); streamBufferSizeUnit.get());
configureTrace(); configureTrace();
} }

View File

@ -0,0 +1,690 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests BlockOutputStream class.
*/
public class TestBlockOutputStream {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf = new OzoneConfiguration();
private static OzoneClient client;
private static ObjectStore objectStore;
private static int chunkSize;
private static int flushSize;
private static int maxFlushSize;
private static int blockSize;
private static String volumeName;
private static String bucketName;
private static String keyString;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
chunkSize = 100;
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
volumeName = "testblockoutputstream";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
private String getKeyName() {
return UUID.randomUUID().toString();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testBufferCaching() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = 50;
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data less than a chunk size, the data will just sit
// in the buffer, with only one buffer being allocated in the buffer pool
Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
//Just the writtenDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
// no data will be flushed till now
Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
Assert.assertEquals(pendingWriteChunkCount,
XceiverClientManager.getXceiverClientMetrics()
.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
XceiverClientManager.getXceiverClientMetrics()
.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// commitIndex2FlushedData Map will be empty here
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
// flush is a sync call, all pending operations will complete
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// we have just written data less than a chunk size, the data will just sit
// in the buffer, with only one buffer being allocated in the buffer pool
Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(0,
blockOutputStream.getBufferPool().getBuffer(0).position());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
// flush ensures watchForCommit updates the total length acknowledged
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 2,
metrics.getTotalOpCount());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@Test
public void testFlushChunk() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = flushSize;
// write data equal to 2 chunks
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
Assert.assertEquals(pendingWriteChunkCount + 2,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount + 1,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data equal flush Size = 2 chunks, at this time
// buffer pool will have 2 buffers allocated worth of chunk size
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
// flush is a sync call, all pending operations will complete
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
// flush ensures watchForCommit updates the total length acknowledged
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@Test
public void testMultiChunkWrite() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = chunkSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
Assert.assertEquals(pendingWriteChunkCount + 1,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data equal flush Size > 1 chunk, at this time
// buffer pool will have 2 buffers allocated worth of chunk size
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
// since data written is still less than flushLength, flushLength will
// still be 0.
Assert.assertEquals(0,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
// flush ensures watchForCommit updates the total length acknowledged
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount());
validateData(keyName, data1);
}
@Test
public void testMultiChunkWrite2() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = flushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
Assert.assertEquals(pendingWriteChunkCount + 2,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount + 1,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data more than flush Size(2 chunks), at this time
// buffer pool will have 3 buffers allocated worth of chunk size
Assert.assertEquals(3, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(flushSize,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
Assert.assertEquals(flushSize,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertEquals(0,
blockOutputStream.getCommitIndex2flushedDataMap().size());
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
key.close();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 5,
metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@Test
public void testFullBufferCondition() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes atleast putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast one entry from the map where each
// entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@Test
public void testWriteWithExceedingMaxBufferLimit() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes atleast putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast one entry from the map where each
// entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
return ContainerTestHelper
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}
private void validateData(String keyName, byte[] data) throws Exception {
ContainerTestHelper
.validateData(keyName, data, objectStore, volumeName, bucketName);
}
}

View File

@ -0,0 +1,546 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests failure detection and handling in BlockOutputStream Class.
*/
public class TestBlockOutputStreamWithFailures {
private static MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
private OzoneClient client;
private ObjectStore objectStore;
private int chunkSize;
private int flushSize;
private int maxFlushSize;
private int blockSize;
private String volumeName;
private String bucketName;
private String keyString;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
@Before
public void init() throws Exception {
chunkSize = 100;
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
volumeName = "testblockoutputstream";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
private String getKeyName() {
return UUID.randomUUID().toString();
}
/**
* Shutdown MiniDFSCluster.
*/
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testWatchForCommitWithCloseContainerException() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes atleast putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data more than flush Size(2 chunks), at this time
// buffer pool will have 4 buffers allocated worth of chunk size
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast one entry from the map where each
// entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
// flush is a sync call, all pending operations will complete
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// Close the containers on the Datanode and write more data
ContainerTestHelper.waitForContainerClose(key, cluster);
// 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
// once exception is hit
key.write(data1);
// As a part of handling the exception, 4 failed writeChunks will be
// rewritten plus one partial chunk plus two putBlocks for flushSize
// and one flush for partial chunk
key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException);
// commitInfoMap will remain intact as there is no server failure
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 14,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 8,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
}
@Test
public void testWatchForCommitDatanodeFailure() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes at least putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data more than flush Size(2 chunks), at this time
// buffer pool will have 3 buffers allocated worth of chunk size
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
// since data written is still less than flushLength, flushLength will
// still be 0.
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast flushSize worth of data buffer
// where each entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
Pipeline pipeline = raftClient.getPipeline();
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
// again write data with more than max buffer limit. This will call
// watchForCommit again. Since the commit will happen 2 way, the
// commitInfoMap will get updated for servers which are alive
key.write(data1);
key.flush();
Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert
.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// in total, there are 8 full write chunks + 2 partial chunks written
Assert.assertEquals(writeChunkCount + 10,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
// 4 flushes at flushSize boundaries + 2 flush for partial chunks
Assert.assertEquals(putBlockCount + 6,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 16,
metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
}
@Test
public void test2DatanodesFailure() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes atleast putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data more than flush Size(2 chunks), at this time
// buffer pool will have 3 buffers allocated worth of chunk size
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// acked by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast one entry from the map where each
// entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
Pipeline pipeline = raftClient.getPipeline();
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
// again write data with more than max buffer limit. This will call
// watchForCommit again. Since the commit will happen 2 way, the
// commitInfoMap will get updated for servers which are alive
// 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
// once exception is hit
key.write(data1);
// As a part of handling the exception, 4 failed writeChunks will be
// rewritten plus one partial chunk plus two putBlocks for flushSize
// and one flush for partial chunk
key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof AlreadyClosedException);
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 14,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 8,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
return ContainerTestHelper
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}
private void validateData(String keyName, byte[] data) throws Exception {
ContainerTestHelper
.validateData(keyName, data, objectStore, volumeName, bucketName);
}
}

View File

@ -338,17 +338,8 @@ public void testMultiBlockWrites3() throws Exception {
private void waitForContainerClose(OzoneOutputStream outputStream) private void waitForContainerClose(OzoneOutputStream outputStream)
throws Exception { throws Exception {
KeyOutputStream keyOutputStream =
(KeyOutputStream) outputStream.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
keyOutputStream.getLocationInfoList();
List<Long> containerIdList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
containerIdList.add(info.getContainerID());
}
Assert.assertTrue(!containerIdList.isEmpty());
ContainerTestHelper ContainerTestHelper
.waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); .waitForContainerClose(outputStream, cluster);
} }
@Ignore // test needs to be fixed after close container is handled for @Ignore // test needs to be fixed after close container is handled for

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.Checksum;
@ -65,6 +66,7 @@
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -719,6 +721,21 @@ public static String getFixedLengthString(String string, int length) {
return String.format("%1$" + length + "s", string); return String.format("%1$" + length + "s", string);
} }
public static void waitForContainerClose(OzoneOutputStream outputStream,
MiniOzoneCluster cluster) throws Exception {
KeyOutputStream keyOutputStream =
(KeyOutputStream) outputStream.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
keyOutputStream.getLocationInfoList();
List<Long> containerIdList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
containerIdList.add(info.getContainerID());
}
Assert.assertTrue(!containerIdList.isEmpty());
ContainerTestHelper
.waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
}
public static void waitForContainerClose(MiniOzoneCluster cluster, public static void waitForContainerClose(MiniOzoneCluster cluster,
Long... containerIdList) Long... containerIdList)
throws ContainerNotFoundException, PipelineNotFoundException, throws ContainerNotFoundException, PipelineNotFoundException,

View File

@ -134,7 +134,7 @@ public void testBlockDeletion() throws Exception {
String keyName = UUID.randomUUID().toString(); String keyName = UUID.randomUUID().toString();
OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length, OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, new HashMap<>()); ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>());
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
out.write(value.getBytes()); out.write(value.getBytes());
} }