diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index 779e63696a..73094346f8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -94,6 +94,10 @@ public ChunkOutputStream(BlockID blockID, String key, this.chunkIndex = 0; } + public ByteBuffer getBuffer() { + return buffer; + } + @Override public synchronized void write(int b) throws IOException { checkOpen(); @@ -106,7 +110,8 @@ public synchronized void write(int b) throws IOException { } @Override - public void write(byte[] b, int off, int len) throws IOException { + public synchronized void write(byte[] b, int off, int len) + throws IOException { if (b == null) { throw new NullPointerException(); } @@ -143,24 +148,27 @@ public synchronized void flush() throws IOException { @Override public synchronized void close() throws IOException { - if (xceiverClientManager != null && xceiverClient != null && - buffer != null) { + if (xceiverClientManager != null && xceiverClient != null + && buffer != null) { + if (buffer.position() > 0) { + writeChunkToContainer(); + } try { - if (buffer.position() > 0) { - writeChunkToContainer(); - } putKey(xceiverClient, containerKeyData.build(), traceID); } catch (IOException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } finally { - xceiverClientManager.releaseClient(xceiverClient); - xceiverClientManager = null; - xceiverClient = null; - buffer = null; + cleanup(); } } + } + public synchronized void cleanup() { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + buffer = null; } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 83b4dfd933..988af07331 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -46,8 +47,10 @@ import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * Maintaining a list of ChunkInputStream. Write based on offset. @@ -111,6 +114,11 @@ public List getStreamEntries() { return streamEntries; } + @VisibleForTesting + public int getOpenID() { + return openID; + } + public ChunkGroupOutputStream( OpenKeySession handler, XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, @@ -220,26 +228,9 @@ public long getByteOffset() { @Override public synchronized void write(int b) throws IOException { - checkNotClosed(); - - if (streamEntries.size() <= currentStreamIndex) { - Preconditions.checkNotNull(omClient); - // allocate a new block, if a exception happens, log an error and - // throw exception to the caller directly, and the write fails. - try { - allocateNewBlock(currentStreamIndex); - } catch (IOException ioe) { - LOG.error("Allocate block fail when writing."); - throw ioe; - } - } - ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex); - entry.write(b); - incrementBlockLength(currentStreamIndex, 1); - if (entry.getRemaining() <= 0) { - currentStreamIndex += 1; - } - byteOffset += 1; + byte[] buf = new byte[1]; + buf[0] = (byte) b; + write(buf, 0, 1); } /** @@ -258,7 +249,10 @@ public synchronized void write(int b) throws IOException { public synchronized void write(byte[] b, int off, int len) throws IOException { checkNotClosed(); + handleWrite(b, off, len); + } + private void handleWrite(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } @@ -288,10 +282,21 @@ public synchronized void write(byte[] b, int off, int len) // still do a sanity check. Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); - int writeLen = Math.min(len, (int)current.getRemaining()); - current.write(b, off, writeLen); + int writeLen = Math.min(len, (int) current.getRemaining()); + try { + current.write(b, off, writeLen); + } catch (IOException ioe) { + if (checkIfContainerIsClosed(ioe)) { + handleCloseContainerException(current, currentStreamIndex); + continue; + } else { + throw ioe; + } + } incrementBlockLength(currentStreamIndex, writeLen); if (current.getRemaining() <= 0) { + // since the current block is already written close the stream. + handleFlushOrClose(true); currentStreamIndex += 1; } len -= writeLen; @@ -300,6 +305,90 @@ public synchronized void write(byte[] b, int off, int len) } } + /** + * It performs following actions : + * a. Updates the committed length at datanode for the current stream in + * datanode. + * b. Reads the data from the underlying buffer and writes it the next stream. + * + * @param streamEntry StreamEntry + * @param streamIndex Index of the entry + * @throws IOException Throws IOexception if Write fails + */ + private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry, + int streamIndex) throws IOException { + // TODO : If the block is still not committed and is in the + // pending openBlock Map, it will return BLOCK_NOT_COMMITTED + // exception. We should handle this by retrying the same operation + // n times and update the OzoneManager with the actual block length + // written. At this point of time, we also need to allocate new blocks + // from a different container and may need to nullify + // all the remaining pre-allocated blocks in case they were + // pre-allocated on the same container which got closed now.This needs + // caching the closed container list on the client itself. + long committedLength = 0; + ByteBuffer buffer = streamEntry.getBuffer(); + if (buffer == null) { + // the buffer here will be null only when closeContainerException is + // hit while calling putKey during close on chunkOutputStream. + // Since closeContainer auto commit pending keys, no need to do + // anything here. + return; + } + + // In case where not a single chunk of data has been written to the Datanode + // yet. This block does not yet exist on the datanode but cached on the + // outputStream buffer. No need to call GetCommittedBlockLength here + // for this block associated with the stream here. + if (streamEntry.currentPosition >= chunkSize + || streamEntry.currentPosition != buffer.position()) { + ContainerProtos.GetCommittedBlockLengthResponseProto responseProto = + ContainerProtocolCalls + .getCommittedBlockLength(streamEntry.xceiverClient, + streamEntry.blockID, requestID); + committedLength = responseProto.getBlockLength(); + // update the length of the current stream + locationInfoList.get(streamIndex).setLength(committedLength); + } + + if (buffer.position() > 0) { + // If the data is still cached in the underlying stream, we need to + // allocate new block and write this data in the datanode. The cached + // data in the buffer does not exceed chunkSize. + Preconditions.checkState(buffer.position() < chunkSize); + currentStreamIndex += 1; + // readjust the byteOffset value to the length actually been written. + byteOffset -= buffer.position(); + handleWrite(buffer.array(), 0, buffer.position()); + } + + // just clean up the current stream. Since the container is already closed, + // it will be auto committed. No need to call close again here. + streamEntry.cleanup(); + // This case will arise when while writing the first chunk itself fails. + // In such case, the current block associated with the stream has no data + // written. Remove it from the current stream list. + if (committedLength == 0) { + streamEntries.remove(streamIndex); + locationInfoList.remove(streamIndex); + Preconditions.checkArgument(currentStreamIndex != 0); + currentStreamIndex -= 1; + } + } + + private boolean checkIfContainerIsClosed(IOException ioe) { + return Optional.of(ioe.getCause()) + .filter(e -> e instanceof StorageContainerException) + .map(e -> (StorageContainerException) e) + .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO) + .isPresent(); + } + + private long getKeyLength() { + return locationInfoList.parallelStream().mapToLong(e -> e.getLength()) + .sum(); + } + /** * Contact OM to get a new block. Set the new block with the index (e.g. * first block has index = 0, second has index = 1 etc.) @@ -317,11 +406,41 @@ private void allocateNewBlock(int index) throws IOException { @Override public synchronized void flush() throws IOException { checkNotClosed(); + handleFlushOrClose(false); + } + + /** + * Close or Flush the latest outputStream. + * @param close Flag which decides whether to call close or flush on the + * outputStream. + * @throws IOException In case, flush or close fails with exception. + */ + private void handleFlushOrClose(boolean close) throws IOException { if (streamEntries.size() == 0) { return; } - for (int i = 0; i <= currentStreamIndex; i++) { - streamEntries.get(i).flush(); + int size = streamEntries.size(); + int streamIndex = + currentStreamIndex >= size ? size - 1 : currentStreamIndex; + ChunkOutputStreamEntry entry = streamEntries.get(streamIndex); + if (entry != null) { + try { + if (close) { + entry.close(); + } else { + entry.flush(); + } + } catch (IOException ioe) { + if (checkIfContainerIsClosed(ioe)) { + // This call will allocate a new streamEntry and write the Data. + // Close needs to be retried on the newly allocated streamEntry as + // as well. + handleCloseContainerException(entry, streamIndex); + handleFlushOrClose(close); + } else { + throw ioe; + } + } } } @@ -336,16 +455,11 @@ public synchronized void close() throws IOException { return; } closed = true; - for (ChunkOutputStreamEntry entry : streamEntries) { - if (entry != null) { - entry.close(); - } - } + handleFlushOrClose(true); if (keyArgs != null) { // in test, this could be null - long length = - locationInfoList.parallelStream().mapToLong(e -> e.getLength()).sum(); - Preconditions.checkState(byteOffset == length); + Preconditions.checkState(streamEntries.size() == locationInfoList.size()); + Preconditions.checkState(byteOffset == getKeyLength()); keyArgs.setDataSize(byteOffset); keyArgs.setLocationInfoList(locationInfoList); omClient.commitKey(keyArgs, openID); @@ -506,6 +620,23 @@ public void close() throws IOException { this.outputStream.close(); } } + + ByteBuffer getBuffer() throws IOException { + if (this.outputStream instanceof ChunkOutputStream) { + ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + return out.getBuffer(); + } + throw new IOException("Invalid Output Stream for Key: " + key); + } + + public void cleanup() { + checkStream(); + if (this.outputStream instanceof ChunkOutputStream) { + ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + out.cleanup(); + } + } + } /** diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 3603964c62..f6e426539a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -125,19 +125,16 @@ public void updateLocationInfoList(List locationInfoList) { OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations(); List currentList = keyLocationInfoGroup.getLocationList(); - Preconditions.checkNotNull(keyLocationInfoGroup); - Preconditions.checkState(locationInfoList.size() <= currentList.size()); - for (OmKeyLocationInfo current : currentList) { - // For Versioning, while committing the key for the newer version, - // we just need to update the lengths for new blocks. Need to iterate over - // and find the new blocks added in the latest version. - for (OmKeyLocationInfo info : locationInfoList) { - if (info.getBlockID().equals(current.getBlockID())) { - current.setLength(info.getLength()); - break; - } - } - } + List latestVersionList = + keyLocationInfoGroup.getBlocksLatestVersionOnly(); + // Updates the latest locationList in the latest version only with + // given locationInfoList here. + // TODO : The original allocated list and the updated list here may vary + // as the containers on the Datanode on which the blocks were pre allocated + // might get closed. The diff of blocks between these two lists here + // need to be garbage collected in case the ozone client dies. + currentList.removeAll(latestVersionList); + currentList.addAll(locationInfoList); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java new file mode 100644 index 0000000000..e5ecd81409 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** + * Tests Close Container Exception handling by Ozone Client. + */ +public class TestCloseContainerHandlingByClient { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + private static int chunkSize; + private static int blockSize; + private static String volumeName; + private static String bucketName; + private static String keyString; + + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true and + * OZONE_HANDLER_TYPE_KEY = "distributed" + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, + OzoneConsts.OZONE_HANDLER_DISTRIBUTED); + chunkSize = (int)OzoneConsts.MB; + blockSize = 4 * chunkSize; + conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize); + conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4)); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3).build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "closecontainerexceptionhandlingtest"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private static String fixedLengthString(String string, int length) { + return String.format("%1$"+length+ "s", string); + } + + @Test + public void testBlockWritesWithFlushAndClose() throws Exception { + String keyName = "standalone"; + OzoneOutputStream key = + createKey(keyName, ReplicationType.STAND_ALONE, 0); + // write data more than 1 chunk + byte[] data = + fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + key.write(data); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName) + .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .build(); + + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + key.write(data); + key.flush(); + key.close(); + // read the key from OM again and match the length.The length will still + // be the equal to the original data size. + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + List keyLocationInfos = + keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + //we have written two blocks + Assert.assertEquals(2, keyLocationInfos.size()); + OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); + Assert.assertEquals(data.length - (data.length % chunkSize), + omKeyLocationInfo.getLength()); + Assert.assertEquals(data.length + (data.length % chunkSize), + keyLocationInfos.get(1).getLength()); + Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); + + // Written the same data twice + String dataString = new String(data); + dataString.concat(dataString); + validateData(keyName, dataString.getBytes()); + } + + @Test + public void testBlockWritesCloseConsistency() throws Exception { + String keyName = "standalone2"; + OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0); + // write data more than 1 chunk + byte[] data = + fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + key.write(data); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName) + .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .build(); + + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + key.close(); + // read the key from OM again and match the length.The length will still + // be the equal to the original data size. + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + List keyLocationInfos = + keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + // Though we have written only block initially, the close will hit + // closeContainerException and remaining data in the chunkOutputStream + // buffer will be copied into a different allocated block and will be + // committed. + Assert.assertEquals(2, keyLocationInfos.size()); + OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); + Assert.assertEquals(data.length - (data.length % chunkSize), + omKeyLocationInfo.getLength()); + Assert.assertEquals(data.length % chunkSize, + keyLocationInfos.get(1).getLength()); + Assert.assertEquals(data.length, keyInfo.getDataSize()); + validateData(keyName, data); + } + + @Test + public void testMultiBlockWrites() throws Exception { + + String keyName = "standalone3"; + OzoneOutputStream key = + createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize)); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + // With the initial size provided, it should have preallocated 3 blocks + Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); + // write data more than 1 chunk + byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes(); + Assert.assertEquals(data.length, 3 * blockSize); + key.write(data); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName) + .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .build(); + + waitForContainerClose(keyName, key, + HddsProtos.ReplicationType.STAND_ALONE); + // write 1 more block worth of data. It will fail and new block will be + // allocated + key.write(fixedLengthString(keyString, blockSize).getBytes()); + + key.close(); + // read the key from OM again and match the length.The length will still + // be the equal to the original data size. + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + List keyLocationInfos = + keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + // Though we have written only block initially, the close will hit + // closeContainerException and remaining data in the chunkOutputStream + // buffer will be copied into a different allocated block and will be + // committed. + Assert.assertEquals(4, keyLocationInfos.size()); + Assert.assertEquals(4 * blockSize, keyInfo.getDataSize()); + for (OmKeyLocationInfo locationInfo : keyLocationInfos) { + Assert.assertEquals(blockSize, locationInfo.getLength()); + } + } + + @Test + public void testMultiBlockWrites2() throws Exception { + + String keyName = "standalone4"; + long dataLength = 0; + OzoneOutputStream key = + createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + // With the initial size provided, it should have pre allocated 4 blocks + Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); + String dataString = fixedLengthString(keyString, (3 * blockSize)); + byte[] data = dataString.getBytes(); + key.write(data); + // 3 block are completely written to the DataNode in 3 blocks. + // Data of length half of chunkSize resides in the chunkOutput stream buffer + String dataString2 = fixedLengthString(keyString, chunkSize * 1 / 2); + key.write(dataString2.getBytes()); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName) + .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .build(); + + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + + key.close(); + // read the key from OM again and match the length.The length will still + // be the equal to the original data size. + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + List keyLocationInfos = + keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + // Though we have written only block initially, the close will hit + // closeContainerException and remaining data in the chunkOutputStream + // buffer will be copied into a different allocated block and will be + // committed. + Assert.assertEquals(4, keyLocationInfos.size()); + dataLength = 3 * blockSize + (long) (0.5 * chunkSize); + Assert.assertEquals(dataLength, keyInfo.getDataSize()); + validateData(keyName, dataString.concat(dataString2).getBytes()); + } + + private void waitForContainerClose(String keyName, + OzoneOutputStream outputStream, HddsProtos.ReplicationType type) + throws Exception { + + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) outputStream.getOutputStream(); + int clientId = groupOutputStream.getOpenID(); + OMMetadataManager metadataManager = + cluster.getOzoneManager().getMetadataManager(); + String objectKey = + metadataManager.getKeyWithDBPrefix(volumeName, bucketName, keyName); + byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientId); + byte[] openKeyData = metadataManager.get(openKey); + OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf( + OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData)); + List locationInfoList = + keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + List containerIdList = new ArrayList<>(); + List pipelineList = new ArrayList<>(); + for (OmKeyLocationInfo info : locationInfoList) { + containerIdList.add(info.getContainerID()); + } + Assert.assertTrue(!containerIdList.isEmpty()); + for (long containerID : containerIdList) { + Pipeline pipeline = + cluster.getStorageContainerManager().getScmContainerManager() + .getContainerWithPipeline(containerID).getPipeline(); + pipelineList.add(pipeline); + List datanodes = pipeline.getMachines(); + for (DatanodeDetails details : datanodes) { + Assert.assertFalse(ContainerTestHelper + .isContainerClosed(cluster, containerID, details)); + // send the order to close the container + cluster.getStorageContainerManager().getScmNodeManager() + .addDatanodeCommand(details.getUuid(), + new CloseContainerCommand(containerID, type, pipeline.getId())); + } + } + + int index = 0; + for (long containerID : containerIdList) { + Pipeline pipeline = pipelineList.get(index); + List datanodes = pipeline.getMachines(); + for (DatanodeDetails datanodeDetails : datanodes) { + GenericTestUtils.waitFor(() -> ContainerTestHelper + .isContainerClosed(cluster, containerID, datanodeDetails), 500, + 15 * 1000); + //double check if it's really closed (waitFor also throws an exception) + Assert.assertTrue(ContainerTestHelper + .isContainerClosed(cluster, containerID, datanodeDetails)); + } + index++; + } + + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + ReplicationFactor factor = + type == ReplicationType.STAND_ALONE ? ReplicationFactor.ONE : + ReplicationFactor.THREE; + return objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey(keyName, size, type, factor); + } + + private void validateData(String keyName, byte[] data) throws Exception { + byte[] readData = new byte[data.length]; + OzoneInputStream is = + objectStore.getVolume(volumeName).getBucket(bucketName) + .readKey(keyName); + is.read(readData); + MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha1.update(data); + MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha2.update(readData); + Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest())); + is.close(); + } + + + @Test + public void testBlockWriteViaRatis() throws Exception { + String keyName = "ratis"; + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + byte[] data = + fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + key.write(data); + + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). + setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE) + .setKeyName(keyName).build(); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + // Again Write the Data. This will throw an exception which will be handled + // and new blocks will be allocated + key.write(data); + key.flush(); + // The write will fail but exception will be handled and length will be + // updated correctly in OzoneManager once the steam is closed + key.close(); + // read the key from OM again and match the length.The length will still + // be the equal to the original data size. + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + List keyLocationInfos = + keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + //we have written two blocks + Assert.assertEquals(2, keyLocationInfos.size()); + OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); + Assert.assertEquals(data.length - (data.length % chunkSize), + omKeyLocationInfo.getLength()); + Assert.assertEquals(data.length + (data.length % chunkSize), + keyLocationInfos.get(1).getLength()); + Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); + String dataString = new String(data); + dataString.concat(dataString); + validateData(keyName, dataString.getBytes()); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index ca92110ac7..dc166b5d0d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -21,6 +21,10 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.hdds.client.BlockID; @@ -604,4 +608,21 @@ public static BlockID getTestBlockID(long containerID) { public static long getTestContainerID() { return Time.getUtcTime(); } + + public static boolean isContainerClosed(MiniOzoneCluster cluster, + long containerID, DatanodeDetails datanode) { + ContainerData containerData; + for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) { + if (datanode.equals(datanodeService.getDatanodeDetails())) { + Container container = + datanodeService.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID); + if (container != null) { + containerData = container.getContainerData(); + return containerData.isClosed(); + } + } + } + return false; + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java index f5dddeed0a..0eb1677ca9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java @@ -39,12 +39,12 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.Assert; import org.junit.rules.ExpectedException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -124,8 +124,8 @@ public void testAllocateCommit() throws Exception { // 1st update, version 0 OpenKeySession openKey = ozoneManager.openKey(keyArgs); // explicitly set the keyLocation list before committing the key. - keyArgs.setLocationInfoList( - openKey.getKeyInfo().getLatestVersionLocations().getLocationList()); + keyArgs.setLocationInfoList(openKey.getKeyInfo().getLatestVersionLocations() + .getBlocksLatestVersionOnly()); ozoneManager.commitKey(keyArgs, openKey.getId()); OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs); @@ -139,8 +139,8 @@ public void testAllocateCommit() throws Exception { //OmKeyLocationInfo locationInfo = // ozoneManager.allocateBlock(keyArgs, openKey.getId()); // explicitly set the keyLocation list before committing the key. - keyArgs.setLocationInfoList( - openKey.getKeyInfo().getLatestVersionLocations().getLocationList()); + keyArgs.setLocationInfoList(openKey.getKeyInfo().getLatestVersionLocations() + .getBlocksLatestVersionOnly()); ozoneManager.commitKey(keyArgs, openKey.getId()); keyInfo = ozoneManager.lookupKey(keyArgs); @@ -150,10 +150,14 @@ public void testAllocateCommit() throws Exception { // 3rd update, version 2 openKey = ozoneManager.openKey(keyArgs); + // this block will be appended to the latest version of version 2. OmKeyLocationInfo locationInfo = ozoneManager.allocateBlock(keyArgs, openKey.getId()); - List locationInfoList = new ArrayList<>(); + List locationInfoList = + openKey.getKeyInfo().getLatestVersionLocations() + .getBlocksLatestVersionOnly(); + Assert.assertTrue(locationInfoList.size() == 1); locationInfoList.add(locationInfo); keyArgs.setLocationInfoList(locationInfoList); ozoneManager.commitKey(keyArgs, openKey.getId());