HDDS-383. Ozone Client should discard preallocated blocks from closed containers. Contributed by Shashikant Banerjee

This commit is contained in:
Tsz Wo Nicholas Sze 2018-09-04 17:10:10 -07:00
parent 6e4c731471
commit 6883fe860f
3 changed files with 102 additions and 52 deletions

View File

@ -53,6 +53,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.ListIterator;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
@ -81,7 +82,6 @@ public class ChunkGroupOutputStream extends OutputStream {
private final int chunkSize;
private final String requestID;
private boolean closed;
private List<OmKeyLocationInfo> locationInfoList;
private final RetryPolicy retryPolicy;
/**
* A constructor for testing purpose only.
@ -97,7 +97,6 @@ public ChunkGroupOutputStream() {
chunkSize = 0;
requestID = null;
closed = false;
locationInfoList = null;
retryPolicy = null;
}
@ -118,9 +117,16 @@ public List<ChunkOutputStreamEntry> getStreamEntries() {
return streamEntries;
}
@VisibleForTesting
public long getOpenID() {
return openID;
public List<OmKeyLocationInfo> getLocationInfoList() {
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
for (ChunkOutputStreamEntry streamEntry : streamEntries) {
OmKeyLocationInfo info =
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
.setShouldCreateContainer(false)
.setLength(streamEntry.currentPosition).setOffset(0).build();
locationInfoList.add(info);
}
return locationInfoList;
}
public ChunkGroupOutputStream(
@ -146,7 +152,6 @@ public ChunkGroupOutputStream(
this.xceiverClientManager = xceiverClientManager;
this.chunkSize = chunkSize;
this.requestID = requestId;
this.locationInfoList = new ArrayList<>();
this.retryPolicy = retryPolicy;
LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationVersions().size());
@ -211,18 +216,6 @@ private void checkKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
chunkSize, subKeyInfo.getLength()));
// reset the original length to zero here. It will be updated as and when
// the data gets written.
subKeyInfo.setLength(0);
locationInfoList.add(subKeyInfo);
}
private void incrementBlockLength(int index, long length) {
if (locationInfoList != null) {
OmKeyLocationInfo locationInfo = locationInfoList.get(index);
long originalLength = locationInfo.getLength();
locationInfo.setLength(originalLength + length);
}
}
@VisibleForTesting
@ -298,7 +291,6 @@ private void handleWrite(byte[] b, int off, int len) throws IOException {
throw ioe;
}
}
incrementBlockLength(currentStreamIndex, writeLen);
if (current.getRemaining() <= 0) {
// since the current block is already written close the stream.
handleFlushOrClose(true);
@ -316,12 +308,6 @@ private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry)
ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
RetryPolicy.RetryAction action;
int numRetries = 0;
// TODO : At this point of time, we also need to allocate new blocks
// from a different container and may need to nullify
// all the remaining pre-allocated blocks in case they were
// pre-allocated on the same container which got closed now.This needs
// caching the closed container list on the client itself.
while (true) {
try {
responseProto = ContainerProtocolCalls
@ -366,6 +352,43 @@ private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry)
}
}
/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
* @param containerID id of the closed container
*/
private void discardPreallocatedBlocks(long containerID) {
// currentStreamIndex < streamEntries.size() signifies that, there are still
// pre allocated blocks available.
if (currentStreamIndex < streamEntries.size()) {
ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
streamEntries.listIterator(currentStreamIndex);
while (streamEntryIterator.hasNext()) {
if (streamEntryIterator.next().blockID.getContainerID()
== containerID) {
streamEntryIterator.remove();
}
}
}
}
/**
* It might be possible that the blocks pre allocated might never get written
* while the stream gets closed normally. In such cases, it would be a good
* idea to trim down the locationInfoList by removing the unused blocks if any
* so as only the used block info gets updated on OzoneManager during close.
*/
private void removeEmptyBlocks() {
if (currentStreamIndex < streamEntries.size()) {
ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
streamEntries.listIterator(currentStreamIndex);
while (streamEntryIterator.hasNext()) {
if (streamEntryIterator.next().currentPosition == 0) {
streamEntryIterator.remove();
}
}
}
}
/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
@ -396,7 +419,7 @@ private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
|| streamEntry.currentPosition != buffer.position()) {
committedLength = getCommittedBlockLength(streamEntry);
// update the length of the current stream
locationInfoList.get(streamIndex).setLength(committedLength);
streamEntry.currentPosition = committedLength;
}
if (buffer.position() > 0) {
@ -418,10 +441,12 @@ private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
// written. Remove it from the current stream list.
if (committedLength == 0) {
streamEntries.remove(streamIndex);
locationInfoList.remove(streamIndex);
Preconditions.checkArgument(currentStreamIndex != 0);
currentStreamIndex -= 1;
}
// discard subsequent pre allocated blocks from the streamEntries list
// from the closed container
discardPreallocatedBlocks(streamEntry.blockID.getContainerID());
}
private boolean checkIfContainerIsClosed(IOException ioe) {
@ -433,7 +458,7 @@ private boolean checkIfContainerIsClosed(IOException ioe) {
}
private long getKeyLength() {
return locationInfoList.parallelStream().mapToLong(e -> e.getLength())
return streamEntries.parallelStream().mapToLong(e -> e.currentPosition)
.sum();
}
@ -506,12 +531,11 @@ public void close() throws IOException {
handleFlushOrClose(true);
if (keyArgs != null) {
// in test, this could be null
Preconditions.checkState(streamEntries.size() == locationInfoList.size());
removeEmptyBlocks();
Preconditions.checkState(byteOffset == getKeyLength());
keyArgs.setDataSize(byteOffset);
keyArgs.setLocationInfoList(locationInfoList);
keyArgs.setLocationInfoList(getLocationInfoList());
omClient.commitKey(keyArgs, openID);
locationInfoList = null;
} else {
LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
}

View File

@ -122,6 +122,7 @@ public void updateModifcationTime() {
* @throws IOException
*/
public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
long latestVersion = getLatestVersionLocations().getVersion();
OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
List<OmKeyLocationInfo> currentList =
keyLocationInfoGroup.getLocationList();
@ -134,6 +135,9 @@ public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
// might get closed. The diff of blocks between these two lists here
// need to be garbage collected in case the ozone client dies.
currentList.removeAll(latestVersionList);
// set each of the locationInfo object to the latest version
locationInfoList.stream().forEach(omKeyLocationInfo -> omKeyLocationInfo
.setCreateVersion(latestVersion));
currentList.addAll(locationInfoList);
}

View File

@ -38,12 +38,10 @@
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@ -247,9 +245,8 @@ public void testMultiBlockWrites() throws Exception {
@Test
public void testMultiBlockWrites2() throws Exception {
String keyName = "standalone4";
long dataLength = 0;
long dataLength;
OzoneOutputStream key =
createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize);
ChunkGroupOutputStream groupOutputStream =
@ -293,11 +290,10 @@ public void testMultiBlockWrites2() throws Exception {
private void waitForContainerClose(String keyName,
OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
throws Exception {
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) outputStream.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
getLocationInfos(groupOutputStream, keyName);
groupOutputStream.getLocationInfoList();
List<Long> containerIdList = new ArrayList<>();
List<Pipeline> pipelineList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
@ -335,6 +331,46 @@ private void waitForContainerClose(String keyName,
}
}
@Test
public void testDiscardPreallocatedBlocks() throws Exception {
String keyName = "discardpreallocatedblocks";
OzoneOutputStream key =
createKey(keyName, ReplicationType.STAND_ALONE, 2 * blockSize);
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) key.getOutputStream();
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
// With the initial size provided, it should have pre allocated 4 blocks
Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
Assert.assertEquals(2, groupOutputStream.getLocationInfoList().size());
String dataString = fixedLengthString(keyString, (1 * blockSize));
byte[] data = dataString.getBytes();
key.write(data);
List<OmKeyLocationInfo> locationInfos =
new ArrayList<>(groupOutputStream.getLocationInfoList());
long containerID = locationInfos.get(0).getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getScmContainerManager()
.getContainerWithPipeline(containerID).getPipeline().getMachines();
Assert.assertEquals(1, datanodes.size());
waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
dataString = fixedLengthString(keyString, (1 * blockSize));
data = dataString.getBytes();
key.write(data);
Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
// the 1st block got written. Now all the containers are closed, so the 2nd
// pre allocated block will be removed from the list and new block should
// have been allocated
Assert.assertTrue(
groupOutputStream.getLocationInfoList().get(0).getBlockID()
.equals(locationInfos.get(0).getBlockID()));
Assert.assertFalse(
groupOutputStream.getLocationInfoList().get(1).getBlockID()
.equals(locationInfos.get(1).getBlockID()));
key.close();
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
ReplicationFactor factor =
@ -344,20 +380,6 @@ private OzoneOutputStream createKey(String keyName, ReplicationType type,
.createKey(keyName, size, type, factor);
}
private List<OmKeyLocationInfo> getLocationInfos(
ChunkGroupOutputStream groupOutputStream, String keyName)
throws IOException {
long clientId = groupOutputStream.getOpenID();
OMMetadataManager metadataManager =
cluster.getOzoneManager().getMetadataManager();
byte[] openKey = metadataManager
.getOpenKeyBytes(volumeName, bucketName, keyName, clientId);
byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
return keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
}
private void validateData(String keyName, byte[] data) throws Exception {
byte[] readData = new byte[data.length];
OzoneInputStream is =
@ -427,7 +449,7 @@ public void testRetriesOnBlockNotCommittedException() throws Exception {
String dataString = fixedLengthString(keyString, (3 * chunkSize));
key.write(dataString.getBytes());
List<OmKeyLocationInfo> locationInfos =
getLocationInfos(groupOutputStream, keyName);
groupOutputStream.getLocationInfoList();
long containerID = locationInfos.get(0).getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getScmContainerManager()