diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 94433179f9..83b4dfd933 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -76,7 +76,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final int chunkSize; private final String requestID; private boolean closed; - + private List locationInfoList; /** * A constructor for testing purpose only. */ @@ -91,6 +91,7 @@ public ChunkGroupOutputStream() { chunkSize = 0; requestID = null; closed = false; + locationInfoList = null; } /** @@ -133,6 +134,7 @@ public ChunkGroupOutputStream( this.xceiverClientManager = xceiverClientManager; this.chunkSize = chunkSize; this.requestID = requestId; + this.locationInfoList = new ArrayList<>(); LOG.debug("Expecting open key with one block, but got" + info.getKeyLocationVersions().size()); } @@ -196,8 +198,19 @@ private void checkKeyLocationInfo(OmKeyLocationInfo subKeyInfo) streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, chunkSize, subKeyInfo.getLength())); + // reset the original length to zero here. It will be updated as and when + // the data gets written. + subKeyInfo.setLength(0); + locationInfoList.add(subKeyInfo); } + private void incrementBlockLength(int index, long length) { + if (locationInfoList != null) { + OmKeyLocationInfo locationInfo = locationInfoList.get(index); + long originalLength = locationInfo.getLength(); + locationInfo.setLength(originalLength + length); + } + } @VisibleForTesting public long getByteOffset() { @@ -222,6 +235,7 @@ public synchronized void write(int b) throws IOException { } ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex); entry.write(b); + incrementBlockLength(currentStreamIndex, 1); if (entry.getRemaining() <= 0) { currentStreamIndex += 1; } @@ -276,6 +290,7 @@ public synchronized void write(byte[] b, int off, int len) ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); int writeLen = Math.min(len, (int)current.getRemaining()); current.write(b, off, writeLen); + incrementBlockLength(currentStreamIndex, writeLen); if (current.getRemaining() <= 0) { currentStreamIndex += 1; } @@ -328,8 +343,13 @@ public synchronized void close() throws IOException { } if (keyArgs != null) { // in test, this could be null + long length = + locationInfoList.parallelStream().mapToLong(e -> e.getLength()).sum(); + Preconditions.checkState(byteOffset == length); keyArgs.setDataSize(byteOffset); + keyArgs.setLocationInfoList(locationInfoList); omClient.commitKey(keyArgs, openID); + locationInfoList = null; } else { LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java index 1f8ed5fb1e..aab35c57a4 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java @@ -19,6 +19,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import java.util.List; + /** * Args for key. Client use this to specify key's attributes on key creation * (putKey()). @@ -30,15 +32,18 @@ public final class OmKeyArgs { private long dataSize; private final ReplicationType type; private final ReplicationFactor factor; + private List locationInfoList; private OmKeyArgs(String volumeName, String bucketName, String keyName, - long dataSize, ReplicationType type, ReplicationFactor factor) { + long dataSize, ReplicationType type, ReplicationFactor factor, + List locationInfoList) { this.volumeName = volumeName; this.bucketName = bucketName; this.keyName = keyName; this.dataSize = dataSize; this.type = type; this.factor = factor; + this.locationInfoList = locationInfoList; } public ReplicationType getType() { @@ -69,6 +74,14 @@ public void setDataSize(long size) { dataSize = size; } + public void setLocationInfoList(List locationInfoList) { + this.locationInfoList = locationInfoList; + } + + public List getLocationInfoList() { + return locationInfoList; + } + /** * Builder class of OmKeyArgs. */ @@ -79,7 +92,7 @@ public static class Builder { private long dataSize; private ReplicationType type; private ReplicationFactor factor; - + private List locationInfoList; public Builder setVolumeName(String volume) { this.volumeName = volume; @@ -111,9 +124,14 @@ public Builder setFactor(ReplicationFactor replicationFactor) { return this; } + public Builder setLocationInfoList(List locationInfos) { + this.locationInfoList = locationInfos; + return this; + } + public OmKeyArgs build() { - return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, - type, factor); + return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type, + factor, locationInfoList); } } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 05c8d45fbf..3603964c62 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -101,8 +101,7 @@ public void setDataSize(long size) { this.dataSize = size; } - public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() - throws IOException { + public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() { return keyLocationVersions.size() == 0? null : keyLocationVersions.get(keyLocationVersions.size() - 1); } @@ -115,6 +114,32 @@ public void updateModifcationTime() { this.modificationTime = Time.monotonicNow(); } + /** + * updates the length of the each block in the list given. + * This will be called when the key is being committed to OzoneManager. + * + * @param locationInfoList list of locationInfo + * @throws IOException + */ + public void updateLocationInfoList(List locationInfoList) { + OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations(); + List currentList = + keyLocationInfoGroup.getLocationList(); + Preconditions.checkNotNull(keyLocationInfoGroup); + Preconditions.checkState(locationInfoList.size() <= currentList.size()); + for (OmKeyLocationInfo current : currentList) { + // For Versioning, while committing the key for the newer version, + // we just need to update the lengths for new blocks. Need to iterate over + // and find the new blocks added in the latest version. + for (OmKeyLocationInfo info : locationInfoList) { + if (info.getBlockID().equals(current.getBlockID())) { + current.setLength(info.getLength()); + break; + } + } + } + } + /** * Append a set of blocks to the latest version. Note that these blocks are * part of the latest version, not a new version. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index 3f6666df81..fae92f8a7d 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -27,7 +27,7 @@ public final class OmKeyLocationInfo { private final BlockID blockID; private final boolean shouldCreateContainer; // the id of this subkey in all the subkeys. - private final long length; + private long length; private final long offset; // the version number indicating when this block was added private long createVersion; @@ -68,6 +68,10 @@ public long getLength() { return length; } + public void setLength(long length) { + this.length = length; + } + public long getOffset() { return offset; } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 37151fb659..e557ac5173 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.om.protocolPB; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.protobuf.RpcController; @@ -581,11 +582,16 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) public void commitKey(OmKeyArgs args, int clientID) throws IOException { CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder(); + List locationInfoList = args.getLocationInfoList(); + Preconditions.checkNotNull(locationInfoList); KeyArgs keyArgs = KeyArgs.newBuilder() .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) .setKeyName(args.getKeyName()) - .setDataSize(args.getDataSize()).build(); + .setDataSize(args.getDataSize()) + .addAllKeyLocations( + locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf) + .collect(Collectors.toList())).build(); req.setKeyArgs(keyArgs); req.setClientID(clientID); diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 36b1c83efb..51a0a7fe4d 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -234,6 +234,7 @@ message KeyArgs { optional uint64 dataSize = 4; optional hadoop.hdds.ReplicationType type = 5; optional hadoop.hdds.ReplicationFactor factor = 6; + repeated KeyLocation keyLocations = 7; } message KeyLocation { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 2fbab361a7..e31b528c77 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.rpc; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -433,6 +434,40 @@ public void testPutKey() } } + @Test + public void testValidateBlockLengthWithCommitKey() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = RandomStringUtils.random(RandomUtils.nextInt(0,1024)); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + String keyName = UUID.randomUUID().toString(); + + // create the initial key with size 0, write will allocate the first block. + OzoneOutputStream out = bucket.createKey(keyName, 0, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE); + out.write(value.getBytes()); + out.close(); + OmKeyArgs.Builder builder = new OmKeyArgs.Builder(); + builder.setVolumeName(volumeName).setBucketName(bucketName) + .setKeyName(keyName); + OmKeyInfo keyInfo = ozoneManager.lookupKey(builder.build()); + + List locationInfoList = + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); + // LocationList should have only 1 block + Assert.assertEquals(1, locationInfoList.size()); + // make sure the data block size is updated + Assert.assertEquals(value.getBytes().length, + locationInfoList.get(0).getLength()); + // make sure the total data size is set correctly + Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize()); + } + + @Test public void testPutKeyRatisOneNode() throws IOException, OzoneException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java index 15122b94ce..f5dddeed0a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -122,6 +123,9 @@ public void testAllocateCommit() throws Exception { // 1st update, version 0 OpenKeySession openKey = ozoneManager.openKey(keyArgs); + // explicitly set the keyLocation list before committing the key. + keyArgs.setLocationInfoList( + openKey.getKeyInfo().getLatestVersionLocations().getLocationList()); ozoneManager.commitKey(keyArgs, openKey.getId()); OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs); @@ -134,6 +138,9 @@ public void testAllocateCommit() throws Exception { openKey = ozoneManager.openKey(keyArgs); //OmKeyLocationInfo locationInfo = // ozoneManager.allocateBlock(keyArgs, openKey.getId()); + // explicitly set the keyLocation list before committing the key. + keyArgs.setLocationInfoList( + openKey.getKeyInfo().getLatestVersionLocations().getLocationList()); ozoneManager.commitKey(keyArgs, openKey.getId()); keyInfo = ozoneManager.lookupKey(keyArgs); @@ -144,7 +151,11 @@ public void testAllocateCommit() throws Exception { // 3rd update, version 2 openKey = ozoneManager.openKey(keyArgs); // this block will be appended to the latest version of version 2. - ozoneManager.allocateBlock(keyArgs, openKey.getId()); + OmKeyLocationInfo locationInfo = + ozoneManager.allocateBlock(keyArgs, openKey.getId()); + List locationInfoList = new ArrayList<>(); + locationInfoList.add(locationInfo); + keyArgs.setLocationInfoList(locationInfoList); ozoneManager.commitKey(keyArgs, openKey.getId()); keyInfo = ozoneManager.lookupKey(keyArgs); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index ba92a29e81..75342c62f5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -342,6 +342,10 @@ public void commitKey(OmKeyArgs args, int clientID) throws IOException { OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData)); keyInfo.setDataSize(args.getDataSize()); keyInfo.setModificationTime(Time.now()); + List locationInfoList = args.getLocationInfoList(); + Preconditions.checkNotNull(locationInfoList); + //update the block length for each block + keyInfo.updateLocationInfoList(locationInfoList); BatchOperation batch = new BatchOperation(); batch.delete(openKey); batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 40a88b698a..45ec2d0412 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -519,9 +519,12 @@ public CommitKeyResponse commitKey(RpcController controller, .setVolumeName(keyArgs.getVolumeName()) .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) - .setDataSize(keyArgs.getDataSize()) + .setLocationInfoList(keyArgs.getKeyLocationsList().stream() + .map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList())) .setType(type) .setFactor(factor) + .setDataSize(keyArgs.getDataSize()) .build(); int id = request.getClientID(); impl.commitKey(omKeyArgs, id);