HDDS-771. ChunkGroupOutputStream stream entries need to be properly updated on closed container exception. Contributed by Lokesh Jain.
This commit is contained in:
parent
2e8ac14dcb
commit
e0ac3081e9
@ -413,6 +413,11 @@ private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update currentStreamIndex in case of closed container exception. The
|
||||||
|
// current stream entry cannot be used for further writes because
|
||||||
|
// container is closed.
|
||||||
|
currentStreamIndex += 1;
|
||||||
|
|
||||||
// In case where not a single chunk of data has been written to the Datanode
|
// In case where not a single chunk of data has been written to the Datanode
|
||||||
// yet. This block does not yet exist on the datanode but cached on the
|
// yet. This block does not yet exist on the datanode but cached on the
|
||||||
// outputStream buffer. No need to call GetCommittedBlockLength here
|
// outputStream buffer. No need to call GetCommittedBlockLength here
|
||||||
@ -429,7 +434,6 @@ private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
|
|||||||
// allocate new block and write this data in the datanode. The cached
|
// allocate new block and write this data in the datanode. The cached
|
||||||
// data in the buffer does not exceed chunkSize.
|
// data in the buffer does not exceed chunkSize.
|
||||||
Preconditions.checkState(buffer.position() < chunkSize);
|
Preconditions.checkState(buffer.position() < chunkSize);
|
||||||
currentStreamIndex += 1;
|
|
||||||
// readjust the byteOffset value to the length actually been written.
|
// readjust the byteOffset value to the length actually been written.
|
||||||
byteOffset -= buffer.position();
|
byteOffset -= buffer.position();
|
||||||
handleWrite(buffer.array(), 0, buffer.position());
|
handleWrite(buffer.array(), 0, buffer.position());
|
||||||
|
@ -34,6 +34,8 @@
|
|||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||||
|
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||||
|
import org.apache.hadoop.ozone.client.OzoneVolume;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||||
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
|
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||||
@ -287,6 +289,64 @@ public void testMultiBlockWrites2() throws Exception {
|
|||||||
validateData(keyName, dataString.concat(dataString2).getBytes());
|
validateData(keyName, dataString.concat(dataString2).getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiBlockWrites3() throws Exception {
|
||||||
|
|
||||||
|
String keyName = "standalone5";
|
||||||
|
int keyLen = 4 * blockSize;
|
||||||
|
OzoneOutputStream key =
|
||||||
|
createKey(keyName, ReplicationType.RATIS, keyLen);
|
||||||
|
ChunkGroupOutputStream groupOutputStream =
|
||||||
|
(ChunkGroupOutputStream) key.getOutputStream();
|
||||||
|
// With the initial size provided, it should have preallocated 4 blocks
|
||||||
|
Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
|
||||||
|
// write data 3 blocks and one more chunk
|
||||||
|
byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes();
|
||||||
|
byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + chunkSize);
|
||||||
|
Assert.assertEquals(data.length, 3 * blockSize + chunkSize);
|
||||||
|
key.write(data);
|
||||||
|
|
||||||
|
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
|
||||||
|
//get the name of a valid container
|
||||||
|
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||||
|
.setBucketName(bucketName)
|
||||||
|
.setType(HddsProtos.ReplicationType.RATIS)
|
||||||
|
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
waitForContainerClose(keyName, key,
|
||||||
|
HddsProtos.ReplicationType.RATIS);
|
||||||
|
// write 3 more chunks worth of data. It will fail and new block will be
|
||||||
|
// allocated. This write completes 4 blocks worth of data written to key
|
||||||
|
data = Arrays
|
||||||
|
.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
|
||||||
|
key.write(data);
|
||||||
|
|
||||||
|
key.close();
|
||||||
|
// read the key from OM again and match the length and data.
|
||||||
|
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||||
|
List<OmKeyLocationInfo> keyLocationInfos =
|
||||||
|
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
|
||||||
|
OzoneVolume volume = objectStore.getVolume(volumeName);
|
||||||
|
OzoneBucket bucket = volume.getBucket(bucketName);
|
||||||
|
OzoneInputStream inputStream = bucket.readKey(keyName);
|
||||||
|
byte[] readData = new byte[keyLen];
|
||||||
|
inputStream.read(readData);
|
||||||
|
Assert.assertArrayEquals(writtenData, readData);
|
||||||
|
|
||||||
|
// Though we have written only block initially, the close will hit
|
||||||
|
// closeContainerException and remaining data in the chunkOutputStream
|
||||||
|
// buffer will be copied into a different allocated block and will be
|
||||||
|
// committed.
|
||||||
|
Assert.assertEquals(5, keyLocationInfos.size());
|
||||||
|
Assert.assertEquals(4 * blockSize, keyInfo.getDataSize());
|
||||||
|
long length = 0;
|
||||||
|
for (OmKeyLocationInfo locationInfo : keyLocationInfos) {
|
||||||
|
length += locationInfo.getLength();
|
||||||
|
}
|
||||||
|
Assert.assertEquals(4 * blockSize, length);
|
||||||
|
}
|
||||||
|
|
||||||
private void waitForContainerClose(String keyName,
|
private void waitForContainerClose(String keyName,
|
||||||
OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
|
OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
Loading…
Reference in New Issue
Block a user