HDDS-226. Client should update block length in OM while committing the key. Contributed by Shashikant Banerjee.

This commit is contained in:
Mukul Kumar Singh 2018-08-01 09:02:43 +05:30
parent 6310c0d17d
commit f4db753bb6
10 changed files with 138 additions and 11 deletions

View File

@ -76,7 +76,7 @@ public class ChunkGroupOutputStream extends OutputStream {
private final int chunkSize; private final int chunkSize;
private final String requestID; private final String requestID;
private boolean closed; private boolean closed;
private List<OmKeyLocationInfo> locationInfoList;
/** /**
* A constructor for testing purpose only. * A constructor for testing purpose only.
*/ */
@ -91,6 +91,7 @@ public ChunkGroupOutputStream() {
chunkSize = 0; chunkSize = 0;
requestID = null; requestID = null;
closed = false; closed = false;
locationInfoList = null;
} }
/** /**
@ -133,6 +134,7 @@ public ChunkGroupOutputStream(
this.xceiverClientManager = xceiverClientManager; this.xceiverClientManager = xceiverClientManager;
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
this.requestID = requestId; this.requestID = requestId;
this.locationInfoList = new ArrayList<>();
LOG.debug("Expecting open key with one block, but got" + LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationVersions().size()); info.getKeyLocationVersions().size());
} }
@ -196,8 +198,19 @@ private void checkKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
chunkSize, subKeyInfo.getLength())); chunkSize, subKeyInfo.getLength()));
// reset the original length to zero here. It will be updated as and when
// the data gets written.
subKeyInfo.setLength(0);
locationInfoList.add(subKeyInfo);
} }
private void incrementBlockLength(int index, long length) {
if (locationInfoList != null) {
OmKeyLocationInfo locationInfo = locationInfoList.get(index);
long originalLength = locationInfo.getLength();
locationInfo.setLength(originalLength + length);
}
}
@VisibleForTesting @VisibleForTesting
public long getByteOffset() { public long getByteOffset() {
@ -222,6 +235,7 @@ public synchronized void write(int b) throws IOException {
} }
ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex); ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
entry.write(b); entry.write(b);
incrementBlockLength(currentStreamIndex, 1);
if (entry.getRemaining() <= 0) { if (entry.getRemaining() <= 0) {
currentStreamIndex += 1; currentStreamIndex += 1;
} }
@ -276,6 +290,7 @@ public synchronized void write(byte[] b, int off, int len)
ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
int writeLen = Math.min(len, (int)current.getRemaining()); int writeLen = Math.min(len, (int)current.getRemaining());
current.write(b, off, writeLen); current.write(b, off, writeLen);
incrementBlockLength(currentStreamIndex, writeLen);
if (current.getRemaining() <= 0) { if (current.getRemaining() <= 0) {
currentStreamIndex += 1; currentStreamIndex += 1;
} }
@ -328,8 +343,13 @@ public synchronized void close() throws IOException {
} }
if (keyArgs != null) { if (keyArgs != null) {
// in test, this could be null // in test, this could be null
long length =
locationInfoList.parallelStream().mapToLong(e -> e.getLength()).sum();
Preconditions.checkState(byteOffset == length);
keyArgs.setDataSize(byteOffset); keyArgs.setDataSize(byteOffset);
keyArgs.setLocationInfoList(locationInfoList);
omClient.commitKey(keyArgs, openID); omClient.commitKey(keyArgs, openID);
locationInfoList = null;
} else { } else {
LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
} }

View File

@ -19,6 +19,8 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import java.util.List;
/** /**
* Args for key. Client use this to specify key's attributes on key creation * Args for key. Client use this to specify key's attributes on key creation
* (putKey()). * (putKey()).
@ -30,15 +32,18 @@ public final class OmKeyArgs {
private long dataSize; private long dataSize;
private final ReplicationType type; private final ReplicationType type;
private final ReplicationFactor factor; private final ReplicationFactor factor;
private List<OmKeyLocationInfo> locationInfoList;
private OmKeyArgs(String volumeName, String bucketName, String keyName, private OmKeyArgs(String volumeName, String bucketName, String keyName,
long dataSize, ReplicationType type, ReplicationFactor factor) { long dataSize, ReplicationType type, ReplicationFactor factor,
List<OmKeyLocationInfo> locationInfoList) {
this.volumeName = volumeName; this.volumeName = volumeName;
this.bucketName = bucketName; this.bucketName = bucketName;
this.keyName = keyName; this.keyName = keyName;
this.dataSize = dataSize; this.dataSize = dataSize;
this.type = type; this.type = type;
this.factor = factor; this.factor = factor;
this.locationInfoList = locationInfoList;
} }
public ReplicationType getType() { public ReplicationType getType() {
@ -69,6 +74,14 @@ public void setDataSize(long size) {
dataSize = size; dataSize = size;
} }
public void setLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
this.locationInfoList = locationInfoList;
}
public List<OmKeyLocationInfo> getLocationInfoList() {
return locationInfoList;
}
/** /**
* Builder class of OmKeyArgs. * Builder class of OmKeyArgs.
*/ */
@ -79,7 +92,7 @@ public static class Builder {
private long dataSize; private long dataSize;
private ReplicationType type; private ReplicationType type;
private ReplicationFactor factor; private ReplicationFactor factor;
private List<OmKeyLocationInfo> locationInfoList;
public Builder setVolumeName(String volume) { public Builder setVolumeName(String volume) {
this.volumeName = volume; this.volumeName = volume;
@ -111,9 +124,14 @@ public Builder setFactor(ReplicationFactor replicationFactor) {
return this; return this;
} }
public Builder setLocationInfoList(List<OmKeyLocationInfo> locationInfos) {
this.locationInfoList = locationInfos;
return this;
}
public OmKeyArgs build() { public OmKeyArgs build() {
return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
type, factor); factor, locationInfoList);
} }
} }
} }

View File

@ -101,8 +101,7 @@ public void setDataSize(long size) {
this.dataSize = size; this.dataSize = size;
} }
public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() {
throws IOException {
return keyLocationVersions.size() == 0? null : return keyLocationVersions.size() == 0? null :
keyLocationVersions.get(keyLocationVersions.size() - 1); keyLocationVersions.get(keyLocationVersions.size() - 1);
} }
@ -115,6 +114,32 @@ public void updateModifcationTime() {
this.modificationTime = Time.monotonicNow(); this.modificationTime = Time.monotonicNow();
} }
/**
* updates the length of the each block in the list given.
* This will be called when the key is being committed to OzoneManager.
*
* @param locationInfoList list of locationInfo
* @throws IOException
*/
public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
List<OmKeyLocationInfo> currentList =
keyLocationInfoGroup.getLocationList();
Preconditions.checkNotNull(keyLocationInfoGroup);
Preconditions.checkState(locationInfoList.size() <= currentList.size());
for (OmKeyLocationInfo current : currentList) {
// For Versioning, while committing the key for the newer version,
// we just need to update the lengths for new blocks. Need to iterate over
// and find the new blocks added in the latest version.
for (OmKeyLocationInfo info : locationInfoList) {
if (info.getBlockID().equals(current.getBlockID())) {
current.setLength(info.getLength());
break;
}
}
}
}
/** /**
* Append a set of blocks to the latest version. Note that these blocks are * Append a set of blocks to the latest version. Note that these blocks are
* part of the latest version, not a new version. * part of the latest version, not a new version.

View File

@ -27,7 +27,7 @@ public final class OmKeyLocationInfo {
private final BlockID blockID; private final BlockID blockID;
private final boolean shouldCreateContainer; private final boolean shouldCreateContainer;
// the id of this subkey in all the subkeys. // the id of this subkey in all the subkeys.
private final long length; private long length;
private final long offset; private final long offset;
// the version number indicating when this block was added // the version number indicating when this block was added
private long createVersion; private long createVersion;
@ -68,6 +68,10 @@ public long getLength() {
return length; return length;
} }
public void setLength(long length) {
this.length = length;
}
public long getOffset() { public long getOffset() {
return offset; return offset;
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.ozone.om.protocolPB; package org.apache.hadoop.ozone.om.protocolPB;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -581,11 +582,16 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID)
public void commitKey(OmKeyArgs args, int clientID) public void commitKey(OmKeyArgs args, int clientID)
throws IOException { throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder(); CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
Preconditions.checkNotNull(locationInfoList);
KeyArgs keyArgs = KeyArgs.newBuilder() KeyArgs keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName()) .setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName()) .setBucketName(args.getBucketName())
.setKeyName(args.getKeyName()) .setKeyName(args.getKeyName())
.setDataSize(args.getDataSize()).build(); .setDataSize(args.getDataSize())
.addAllKeyLocations(
locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf)
.collect(Collectors.toList())).build();
req.setKeyArgs(keyArgs); req.setKeyArgs(keyArgs);
req.setClientID(clientID); req.setClientID(clientID);

View File

@ -234,6 +234,7 @@ message KeyArgs {
optional uint64 dataSize = 4; optional uint64 dataSize = 4;
optional hadoop.hdds.ReplicationType type = 5; optional hadoop.hdds.ReplicationType type = 5;
optional hadoop.hdds.ReplicationFactor factor = 6; optional hadoop.hdds.ReplicationFactor factor = 6;
repeated KeyLocation keyLocations = 7;
} }
message KeyLocation { message KeyLocation {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.client.rpc; package org.apache.hadoop.ozone.client.rpc;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -433,6 +434,40 @@ public void testPutKey()
} }
} }
@Test
public void testValidateBlockLengthWithCommitKey() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String value = RandomStringUtils.random(RandomUtils.nextInt(0,1024));
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
String keyName = UUID.randomUUID().toString();
// create the initial key with size 0, write will allocate the first block.
OzoneOutputStream out = bucket.createKey(keyName, 0,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
out.write(value.getBytes());
out.close();
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
builder.setVolumeName(volumeName).setBucketName(bucketName)
.setKeyName(keyName);
OmKeyInfo keyInfo = ozoneManager.lookupKey(builder.build());
List<OmKeyLocationInfo> locationInfoList =
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
// LocationList should have only 1 block
Assert.assertEquals(1, locationInfoList.size());
// make sure the data block size is updated
Assert.assertEquals(value.getBytes().length,
locationInfoList.get(0).getLength());
// make sure the total data size is set correctly
Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize());
}
@Test @Test
public void testPutKeyRatisOneNode() public void testPutKeyRatisOneNode()
throws IOException, OzoneException { throws IOException, OzoneException {

View File

@ -44,6 +44,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -122,6 +123,9 @@ public void testAllocateCommit() throws Exception {
// 1st update, version 0 // 1st update, version 0
OpenKeySession openKey = ozoneManager.openKey(keyArgs); OpenKeySession openKey = ozoneManager.openKey(keyArgs);
// explicitly set the keyLocation list before committing the key.
keyArgs.setLocationInfoList(
openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
ozoneManager.commitKey(keyArgs, openKey.getId()); ozoneManager.commitKey(keyArgs, openKey.getId());
OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs); OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
@ -134,6 +138,9 @@ public void testAllocateCommit() throws Exception {
openKey = ozoneManager.openKey(keyArgs); openKey = ozoneManager.openKey(keyArgs);
//OmKeyLocationInfo locationInfo = //OmKeyLocationInfo locationInfo =
// ozoneManager.allocateBlock(keyArgs, openKey.getId()); // ozoneManager.allocateBlock(keyArgs, openKey.getId());
// explicitly set the keyLocation list before committing the key.
keyArgs.setLocationInfoList(
openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
ozoneManager.commitKey(keyArgs, openKey.getId()); ozoneManager.commitKey(keyArgs, openKey.getId());
keyInfo = ozoneManager.lookupKey(keyArgs); keyInfo = ozoneManager.lookupKey(keyArgs);
@ -144,7 +151,11 @@ public void testAllocateCommit() throws Exception {
// 3rd update, version 2 // 3rd update, version 2
openKey = ozoneManager.openKey(keyArgs); openKey = ozoneManager.openKey(keyArgs);
// this block will be appended to the latest version of version 2. // this block will be appended to the latest version of version 2.
ozoneManager.allocateBlock(keyArgs, openKey.getId()); OmKeyLocationInfo locationInfo =
ozoneManager.allocateBlock(keyArgs, openKey.getId());
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
locationInfoList.add(locationInfo);
keyArgs.setLocationInfoList(locationInfoList);
ozoneManager.commitKey(keyArgs, openKey.getId()); ozoneManager.commitKey(keyArgs, openKey.getId());
keyInfo = ozoneManager.lookupKey(keyArgs); keyInfo = ozoneManager.lookupKey(keyArgs);

View File

@ -342,6 +342,10 @@ public void commitKey(OmKeyArgs args, int clientID) throws IOException {
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData)); OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData));
keyInfo.setDataSize(args.getDataSize()); keyInfo.setDataSize(args.getDataSize());
keyInfo.setModificationTime(Time.now()); keyInfo.setModificationTime(Time.now());
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
Preconditions.checkNotNull(locationInfoList);
//update the block length for each block
keyInfo.updateLocationInfoList(locationInfoList);
BatchOperation batch = new BatchOperation(); BatchOperation batch = new BatchOperation();
batch.delete(openKey); batch.delete(openKey);
batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray()); batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray());

View File

@ -519,9 +519,12 @@ public CommitKeyResponse commitKey(RpcController controller,
.setVolumeName(keyArgs.getVolumeName()) .setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName()) .setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName()) .setKeyName(keyArgs.getKeyName())
.setDataSize(keyArgs.getDataSize()) .setLocationInfoList(keyArgs.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList()))
.setType(type) .setType(type)
.setFactor(factor) .setFactor(factor)
.setDataSize(keyArgs.getDataSize())
.build(); .build();
int id = request.getClientID(); int id = request.getClientID();
impl.commitKey(omKeyArgs, id); impl.commitKey(omKeyArgs, id);