HDFS-12000. Ozone: Container : Add key versioning support-1. Contributed by Chen Liang.

This commit is contained in:
Xiaoyu Yao 2017-12-13 16:07:53 -08:00 committed by Owen O'Malley
parent c0c87dea9b
commit 8ff98e2af3
17 changed files with 582 additions and 74 deletions

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@ -166,13 +165,11 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient, String requestId)
throws IOException {
int index = 0;
long length = 0;
String containerKey;
ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
for (KsmKeyLocationInfo ksmKeyLocationInfo : keyInfo.getKeyLocationList()) {
// check index as sanity check
Preconditions.checkArgument(index++ == ksmKeyLocationInfo.getIndex());
for (KsmKeyLocationInfo ksmKeyLocationInfo :
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
String containerName = ksmKeyLocationInfo.getContainerName();
Pipeline pipeline =
storageContainerLocationClient.getContainer(containerName);

View File

@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
@ -43,6 +44,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
@ -98,6 +100,11 @@ public synchronized void addStream(OutputStream outputStream, long length) {
streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
}
@VisibleForTesting
public List<ChunkOutputStreamEntry> getStreamEntries() {
return streamEntries;
}
public ChunkGroupOutputStream(
OpenKeySession handler, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
@ -122,12 +129,31 @@ public ChunkGroupOutputStream(
this.chunkSize = chunkSize;
this.requestID = requestId;
LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationList().size());
info.getKeyLocationVersions().size());
}
/**
* When a key is opened, it is possible that there are some blocks already
* allocated to it for this open session. In this case, to make use of these
* blocks, we need to add these blocks to stream entries. But, a key's version
* also includes blocks from previous versions, we need to avoid adding these
* old blocks to stream entries, because these old blocks should not be picked
* for write. To do this, the following method checks that, only those
* blocks created in this particular open version are added to stream entries.
*
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
* @throws IOException
*/
public void addPreallocateBlocks(KsmKeyLocationInfoGroup version,
long openVersion) throws IOException {
// server may return any number of blocks, (0 to any)
int idx = 0;
for (KsmKeyLocationInfo subKeyInfo : info.getKeyLocationList()) {
subKeyInfo.setIndex(idx++);
checkKeyLocationInfo(subKeyInfo);
// only the blocks allocated in this open session (block createVersion
// equals to open session version)
for (KsmKeyLocationInfo subKeyInfo : version.getLocationList()) {
if (subKeyInfo.getCreateVersion() == openVersion) {
checkKeyLocationInfo(subKeyInfo);
}
}
}
@ -255,7 +281,6 @@ public synchronized void write(byte[] b, int off, int len)
*/
private void allocateNewBlock(int index) throws IOException {
KsmKeyLocationInfo subKeyInfo = ksmClient.allocateBlock(keyArgs, openID);
subKeyInfo.setIndex(index);
checkKeyLocationInfo(subKeyInfo);
}

View File

@ -464,6 +464,9 @@ public OzoneOutputStream createKey(
.setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
.setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
return new OzoneOutputStream(groupOutputStream);
}

View File

@ -17,9 +17,11 @@
*/
package org.apache.hadoop.ozone.ksm.helpers;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@ -34,18 +36,29 @@ public final class KsmKeyInfo {
// name of key client specified
private final String keyName;
private long dataSize;
private List<KsmKeyLocationInfo> keyLocationList;
private List<KsmKeyLocationInfoGroup> keyLocationVersions;
private final long creationTime;
private long modificationTime;
private KsmKeyInfo(String volumeName, String bucketName, String keyName,
List<KsmKeyLocationInfo> locationInfos, long dataSize, long creationTime,
long modificationTime) {
List<KsmKeyLocationInfoGroup> versions, long dataSize,
long creationTime, long modificationTime) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
this.dataSize = dataSize;
this.keyLocationList = locationInfos;
// it is important that the versions are ordered from old to new.
// Do this sanity check when versions got loaded on creating KsmKeyInfo.
// TODO : this is not necessary, here only because versioning is still a
// work in-progress, remove this following check when versioning is
// complete and prove correctly functioning
long currentVersion = -1;
for (KsmKeyLocationInfoGroup version : versions) {
Preconditions.checkArgument(
currentVersion + 1 == version.getVersion());
currentVersion = version.getVersion();
}
this.keyLocationVersions = versions;
this.creationTime = creationTime;
this.modificationTime = modificationTime;
}
@ -70,16 +83,64 @@ public void setDataSize(long size) {
this.dataSize = size;
}
public List<KsmKeyLocationInfo> getKeyLocationList() {
return keyLocationList;
public synchronized KsmKeyLocationInfoGroup getLatestVersionLocations()
throws IOException {
return keyLocationVersions.size() == 0? null :
keyLocationVersions.get(keyLocationVersions.size() - 1);
}
public List<KsmKeyLocationInfoGroup> getKeyLocationVersions() {
return keyLocationVersions;
}
public void updateModifcationTime() {
this.modificationTime = Time.monotonicNow();
}
public void appendKeyLocation(KsmKeyLocationInfo newLocation) {
keyLocationList.add(newLocation);
/**
* Append a set of blocks to the latest version. Note that these blocks are
* part of the latest version, not a new version.
*
* @param newLocationList the list of new blocks to be added.
* @throws IOException
*/
public synchronized void appendNewBlocks(
List<KsmKeyLocationInfo> newLocationList) throws IOException {
if (keyLocationVersions.size() == 0) {
throw new IOException("Appending new block, but no version exist");
}
KsmKeyLocationInfoGroup currentLatestVersion =
keyLocationVersions.get(keyLocationVersions.size() - 1);
currentLatestVersion.appendNewBlocks(newLocationList);
setModificationTime(Time.now());
}
/**
* Add a new set of blocks. The new blocks will be added as appending a new
* version to the all version list.
*
* @param newLocationList the list of new blocks to be added.
* @throws IOException
*/
public synchronized long addNewVersion(
List<KsmKeyLocationInfo> newLocationList) throws IOException {
long latestVersionNum;
if (keyLocationVersions.size() == 0) {
// no version exist, these blocks are the very first version.
keyLocationVersions.add(new KsmKeyLocationInfoGroup(0, newLocationList));
latestVersionNum = 0;
} else {
// it is important that the new version are always at the tail of the list
KsmKeyLocationInfoGroup currentLatestVersion =
keyLocationVersions.get(keyLocationVersions.size() - 1);
// the new version is created based on the current latest version
KsmKeyLocationInfoGroup newVersion =
currentLatestVersion.generateNextVersion(newLocationList);
keyLocationVersions.add(newVersion);
latestVersionNum = newVersion.getVersion();
}
setModificationTime(Time.now());
return latestVersionNum;
}
public long getCreationTime() {
@ -102,7 +163,7 @@ public static class Builder {
private String bucketName;
private String keyName;
private long dataSize;
private List<KsmKeyLocationInfo> ksmKeyLocationInfos;
private List<KsmKeyLocationInfoGroup> ksmKeyLocationInfoGroups;
private long creationTime;
private long modificationTime;
@ -122,8 +183,8 @@ public Builder setKeyName(String key) {
}
public Builder setKsmKeyLocationInfos(
List<KsmKeyLocationInfo> ksmKeyLocationInfoList) {
this.ksmKeyLocationInfos = ksmKeyLocationInfoList;
List<KsmKeyLocationInfoGroup> ksmKeyLocationInfoList) {
this.ksmKeyLocationInfoGroups = ksmKeyLocationInfoList;
return this;
}
@ -144,19 +205,23 @@ public Builder setModificationTime(long mTime) {
public KsmKeyInfo build() {
return new KsmKeyInfo(
volumeName, bucketName, keyName, ksmKeyLocationInfos,
volumeName, bucketName, keyName, ksmKeyLocationInfoGroups,
dataSize, creationTime, modificationTime);
}
}
public KeyInfo getProtobuf() {
long latestVersion = keyLocationVersions.size() == 0 ? -1 :
keyLocationVersions.get(keyLocationVersions.size() - 1).getVersion();
return KeyInfo.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setDataSize(dataSize)
.addAllKeyLocationList(keyLocationList.stream()
.map(KsmKeyLocationInfo::getProtobuf).collect(Collectors.toList()))
.addAllKeyLocationList(keyLocationVersions.stream()
.map(KsmKeyLocationInfoGroup::getProtobuf)
.collect(Collectors.toList()))
.setLatestVersion(latestVersion)
.setCreationTime(creationTime)
.setModificationTime(modificationTime)
.build();
@ -168,7 +233,7 @@ public static KsmKeyInfo getFromProtobuf(KeyInfo keyInfo) {
keyInfo.getBucketName(),
keyInfo.getKeyName(),
keyInfo.getKeyLocationListList().stream()
.map(KsmKeyLocationInfo::getFromProtobuf)
.map(KsmKeyLocationInfoGroup::getFromProtobuf)
.collect(Collectors.toList()),
keyInfo.getDataSize(),
keyInfo.getCreationTime(),

View File

@ -28,21 +28,29 @@ public final class KsmKeyLocationInfo {
private final String blockID;
private final boolean shouldCreateContainer;
// the id of this subkey in all the subkeys.
private int index;
private final long length;
private final long offset;
// the version number indicating when this block was added
private long createVersion;
private KsmKeyLocationInfo(String containerName,
String blockID, boolean shouldCreateContainer, int index,
String blockID, boolean shouldCreateContainer,
long length, long offset) {
this.containerName = containerName;
this.blockID = blockID;
this.shouldCreateContainer = shouldCreateContainer;
this.index = index;
this.length = length;
this.offset = offset;
}
public void setCreateVersion(long version) {
createVersion = version;
}
public long getCreateVersion() {
return createVersion;
}
public String getContainerName() {
return containerName;
}
@ -55,14 +63,6 @@ public boolean getShouldCreateContainer() {
return shouldCreateContainer;
}
public int getIndex() {
return index;
}
public void setIndex(int idx) {
index = idx;
}
public long getLength() {
return length;
}
@ -78,10 +78,9 @@ public static class Builder {
private String containerName;
private String blockID;
private boolean shouldCreateContainer;
// the id of this subkey in all the subkeys.
private int index;
private long length;
private long offset;
public Builder setContainerName(String container) {
this.containerName = container;
return this;
@ -97,11 +96,6 @@ public Builder setShouldCreateContainer(boolean create) {
return this;
}
public Builder setIndex(int id) {
this.index = id;
return this;
}
public Builder setLength(long len) {
this.length = len;
return this;
@ -114,7 +108,7 @@ public Builder setOffset(long off) {
public KsmKeyLocationInfo build() {
return new KsmKeyLocationInfo(containerName, blockID,
shouldCreateContainer, index, length, offset);
shouldCreateContainer, length, offset);
}
}
@ -123,19 +117,20 @@ public KeyLocation getProtobuf() {
.setContainerName(containerName)
.setBlockID(blockID)
.setShouldCreateContainer(shouldCreateContainer)
.setIndex(index)
.setLength(length)
.setOffset(offset)
.setCreateVersion(createVersion)
.build();
}
public static KsmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
return new KsmKeyLocationInfo(
KsmKeyLocationInfo info = new KsmKeyLocationInfo(
keyLocation.getContainerName(),
keyLocation.getBlockID(),
keyLocation.getShouldCreateContainer(),
keyLocation.getIndex(),
keyLocation.getLength(),
keyLocation.getOffset());
info.setCreateVersion(keyLocation.getCreateVersion());
return info;
}
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.ksm.helpers;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyLocationList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* A list of key locations. This class represents one single version of the
* blocks of a key.
*/
public class KsmKeyLocationInfoGroup {
private final long version;
private final List<KsmKeyLocationInfo> locationList;
public KsmKeyLocationInfoGroup(long version,
List<KsmKeyLocationInfo> locations) {
this.version = version;
this.locationList = locations;
}
/**
* Return only the blocks that are created in the most recent version.
*
* @return the list of blocks that are created in the latest version.
*/
public List<KsmKeyLocationInfo> getBlocksLatestVersionOnly() {
List<KsmKeyLocationInfo> list = new ArrayList<>();
locationList.stream().filter(x -> x.getCreateVersion() == version)
.forEach(list::add);
return list;
}
public long getVersion() {
return version;
}
public List<KsmKeyLocationInfo> getLocationList() {
return locationList;
}
public KeyLocationList getProtobuf() {
return KeyLocationList.newBuilder()
.setVersion(version)
.addAllKeyLocations(
locationList.stream().map(KsmKeyLocationInfo::getProtobuf)
.collect(Collectors.toList()))
.build();
}
public static KsmKeyLocationInfoGroup getFromProtobuf(
KeyLocationList keyLocationList) {
return new KsmKeyLocationInfoGroup(
keyLocationList.getVersion(),
keyLocationList.getKeyLocationsList().stream()
.map(KsmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList()));
}
/**
* Given a new block location, generate a new version list based upon this
* one.
*
* @param newLocationList a list of new location to be added.
* @return
*/
KsmKeyLocationInfoGroup generateNextVersion(
List<KsmKeyLocationInfo> newLocationList) throws IOException {
// TODO : revisit if we can do this method more efficiently
// one potential inefficiency here is that later version always include
// older ones. e.g. v1 has B1, then v2, v3...will all have B1 and only add
// more
List<KsmKeyLocationInfo> newList = new ArrayList<>();
newList.addAll(locationList);
for (KsmKeyLocationInfo newInfo : newLocationList) {
// all these new blocks will have addVersion of current version + 1
newInfo.setCreateVersion(version + 1);
newList.add(newInfo);
}
return new KsmKeyLocationInfoGroup(version + 1, newList);
}
void appendNewBlocks(List<KsmKeyLocationInfo> newLocationList)
throws IOException {
for (KsmKeyLocationInfo info : newLocationList) {
info.setCreateVersion(version);
locationList.add(info);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("version:").append(version).append(" ");
for (KsmKeyLocationInfo kli : locationList) {
sb.append(kli.getBlockID()).append(" || ");
}
return sb.toString();
}
}

View File

@ -25,10 +25,19 @@
public class OpenKeySession {
private final int id;
private final KsmKeyInfo keyInfo;
// the version of the key when it is being opened in this session.
// a block that has a create version equals to open version means it will
// be committed only when this open session is closed.
private long openVersion;
public OpenKeySession(int id, KsmKeyInfo info) {
public OpenKeySession(int id, KsmKeyInfo info, long version) {
this.id = id;
this.keyInfo = info;
this.openVersion = version;
}
public long getOpenVersion() {
return this.openVersion;
}
public KsmKeyInfo getKeyInfo() {

View File

@ -538,7 +538,7 @@ public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
throw new IOException("Create key failed, error:" + resp.getStatus());
}
return new OpenKeySession(resp.getID(),
KsmKeyInfo.getFromProtobuf(resp.getKeyInfo()));
KsmKeyInfo.getFromProtobuf(resp.getKeyInfo()), resp.getOpenVersion());
}
@Override

View File

@ -235,7 +235,13 @@ message KeyLocation {
required bool shouldCreateContainer = 3;
required uint64 offset = 4;
required uint64 length = 5;
required uint32 index = 6;
// indicated at which version this block gets created.
optional uint64 createVersion = 6;
}
message KeyLocationList {
optional uint64 version = 1;
repeated KeyLocation keyLocations = 2;
}
message KeyInfo {
@ -243,9 +249,10 @@ message KeyInfo {
required string bucketName = 2;
required string keyName = 3;
required uint64 dataSize = 4;
repeated KeyLocation keyLocationList = 5;
repeated KeyLocationList keyLocationList = 5;
required uint64 creationTime = 6;
required uint64 modificationTime = 7;
optional uint64 latestVersion = 8;
}
message LocateKeyRequest {
@ -258,6 +265,8 @@ message LocateKeyResponse {
// clients' followup request may carry this ID for stateful operations (similar
// to a cookie).
optional uint32 ID = 3;
// TODO : allow specifiying a particular version to read.
optional uint64 openVersion = 4;
}
message SetBucketPropertyRequest {

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.common.BlockGroup;
@ -45,6 +46,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
@ -473,7 +475,11 @@ public List<BlockGroup> getPendingDeletionKeys(final int count)
KsmKeyInfo info =
KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
// Get block keys as a list.
List<String> item = info.getKeyLocationList().stream()
KsmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
if (latest == null) {
return Collections.emptyList();
}
List<String> item = latest.getLocationList().stream()
.map(KsmKeyLocationInfo::getBlockID)
.collect(Collectors.toList());
BlockGroup keyBlocks = BlockGroup.newBuilder()
@ -503,7 +509,8 @@ public List<BlockGroup> getExpiredOpenKeys() throws IOException {
continue;
}
// Get block keys as a list.
List<String> item = info.getKeyLocationList().stream()
List<String> item = info.getLatestVersionLocations()
.getBlocksLatestVersionOnly().stream()
.map(KsmKeyLocationInfo::getBlockID)
.collect(Collectors.toList());
BlockGroup keyBlocks = BlockGroup.newBuilder()

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.KeyInfo;
@ -40,6 +41,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ -195,9 +197,10 @@ public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setLength(scmBlockSize)
.setOffset(0)
.setIndex(keyInfo.getKeyLocationList().size())
.build();
keyInfo.appendKeyLocation(info);
// current version not committed, so new blocks coming now are added to
// the same version
keyInfo.appendNewBlocks(Collections.singletonList(info));
keyInfo.updateModifcationTime();
metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
return info;
@ -237,7 +240,6 @@ public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
// the point, if client needs more blocks, client can always call
// allocateBlock. But if requested size is not 0, KSM will preallocate
// some blocks and piggyback to client, to save RPC calls.
int idx = 0;
while (requestedSize > 0) {
long allocateSize = Math.min(scmBlockSize, requestedSize);
AllocatedBlock allocatedBlock =
@ -246,28 +248,45 @@ public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
.setContainerName(allocatedBlock.getPipeline().getContainerName())
.setBlockID(allocatedBlock.getKey())
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setIndex(idx++)
.setLength(allocateSize)
.setOffset(0)
.build();
locations.add(subKeyInfo);
requestedSize -= allocateSize;
}
long currentTime = Time.now();
// NOTE size of a key is not a hard limit on anything, it is a value that
// client should expect, in terms of current size of key. If client sets a
// value, then this value is used, otherwise, we allocate a single block
// which is the current size, if read by the client.
long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
KsmKeyInfo keyInfo = new KsmKeyInfo.Builder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setKsmKeyLocationInfos(locations)
.setCreationTime(currentTime)
.setModificationTime(currentTime)
.setDataSize(size)
.build();
byte[] keyKey = metadataManager.getDBKeyBytes(
volumeName, bucketName, keyName);
byte[] value = metadataManager.get(keyKey);
KsmKeyInfo keyInfo;
long openVersion;
if (value != null) {
// the key already exist, the new blocks will be added as new version
keyInfo = KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
// when locations.size = 0, the new version will have identical blocks
// as its previous version
openVersion = keyInfo.addNewVersion(locations);
keyInfo.setDataSize(size + keyInfo.getDataSize());
} else {
// the key does not exist, create a new object, the new blocks are the
// version 0
long currentTime = Time.now();
keyInfo = new KsmKeyInfo.Builder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setKsmKeyLocationInfos(Collections.singletonList(
new KsmKeyLocationInfoGroup(0, locations)))
.setCreationTime(currentTime)
.setModificationTime(currentTime)
.setDataSize(size)
.build();
openVersion = 0;
}
// Generate a random ID which is not already in meta db.
int id = -1;
// in general this should finish in a couple times at most. putting some
@ -285,7 +304,7 @@ public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
}
LOG.debug("Key {} allocated in volume {} bucket {}",
keyName, volumeName, bucketName);
return new OpenKeySession(id, keyInfo);
return new OpenKeySession(id, keyInfo, openVersion);
} catch (KSMException e) {
throw e;
} catch (IOException ex) {

View File

@ -334,6 +334,7 @@ public LocateKeyResponse createKey(
OpenKeySession openKey = impl.openKey(ksmKeyArgs);
resp.setKeyInfo(openKey.getKeyInfo().getProtobuf());
resp.setID(openKey.getId());
resp.setOpenVersion(openKey.getOpenVersion());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));

View File

@ -419,6 +419,9 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException,
.setType(xceiverClientManager.getType())
.setFactor(xceiverClientManager.getFactor())
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
return new OzoneOutputStream(groupOutputStream);
}

View File

@ -323,7 +323,7 @@ private Map<String, List<String>> createDeleteTXLog(DeletedBlockLog delLog,
// on datanodes.
Set<String> containerNames = new HashSet<>();
for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
entry.getValue().getKeyLocationList()
entry.getValue().getLatestVersionLocations().getLocationList()
.forEach(loc -> containerNames.add(loc.getContainerName()));
}
@ -331,7 +331,7 @@ private Map<String, List<String>> createDeleteTXLog(DeletedBlockLog delLog,
// total number of containerBlocks via creation call.
int totalCreatedBlocks = 0;
for (KsmKeyInfo info : keyLocations.values()) {
totalCreatedBlocks += info.getKeyLocationList().size();
totalCreatedBlocks += info.getKeyLocationVersions().size();
}
Assert.assertTrue(totalCreatedBlocks > 0);
Assert.assertEquals(totalCreatedBlocks,
@ -340,7 +340,8 @@ private Map<String, List<String>> createDeleteTXLog(DeletedBlockLog delLog,
// Create a deletion TX for each key.
Map<String, List<String>> containerBlocks = Maps.newHashMap();
for (KsmKeyInfo info : keyLocations.values()) {
List<KsmKeyLocationInfo> list = info.getKeyLocationList();
List<KsmKeyLocationInfo> list =
info.getLatestVersionLocations().getLocationList();
list.forEach(location -> {
if (containerBlocks.containsKey(location.getContainerName())) {
containerBlocks.get(location.getContainerName())

View File

@ -387,7 +387,8 @@ private boolean verifyRatisReplication(String volumeName, String bucketName,
OzoneProtos.ReplicationFactor replicationFactor =
OzoneProtos.ReplicationFactor.valueOf(factor.getValue());
KsmKeyInfo keyInfo = keySpaceManager.lookupKey(keyArgs);
for (KsmKeyLocationInfo info: keyInfo.getKeyLocationList()) {
for (KsmKeyLocationInfo info:
keyInfo.getLatestVersionLocations().getLocationList()) {
Pipeline pipeline =
storageContainerLocationClient.getContainer(info.getContainerName());
if ((pipeline.getFactor() != replicationFactor) ||

View File

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* This class tests the versioning of blocks from KSM side.
*/
public class TestKsmBlockVersioning {
private static MiniOzoneCluster cluster = null;
private static UserArgs userArgs;
private static OzoneConfiguration conf;
private static KeySpaceManager keySpaceManager;
private static StorageHandler storageHandler;
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed"
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
null, null, null, null);
keySpaceManager = cluster.getKeySpaceManager();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testAllocateCommit() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
String keyName = "key" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setDataSize(1000)
.build();
// 1st update, version 0
OpenKeySession openKey = keySpaceManager.openKey(keyArgs);
keySpaceManager.commitKey(keyArgs, openKey.getId());
KsmKeyInfo keyInfo = keySpaceManager.lookupKey(keyArgs);
KsmKeyLocationInfoGroup highestVersion =
checkVersions(keyInfo.getKeyLocationVersions());
assertEquals(0, highestVersion.getVersion());
assertEquals(1, highestVersion.getLocationList().size());
// 2nd update, version 1
openKey = keySpaceManager.openKey(keyArgs);
//KsmKeyLocationInfo locationInfo =
// keySpaceManager.allocateBlock(keyArgs, openKey.getId());
keySpaceManager.commitKey(keyArgs, openKey.getId());
keyInfo = keySpaceManager.lookupKey(keyArgs);
highestVersion = checkVersions(keyInfo.getKeyLocationVersions());
assertEquals(1, highestVersion.getVersion());
assertEquals(2, highestVersion.getLocationList().size());
// 3rd update, version 2
openKey = keySpaceManager.openKey(keyArgs);
// this block will be appended to the latest version of version 2.
keySpaceManager.allocateBlock(keyArgs, openKey.getId());
keySpaceManager.commitKey(keyArgs, openKey.getId());
keyInfo = keySpaceManager.lookupKey(keyArgs);
highestVersion = checkVersions(keyInfo.getKeyLocationVersions());
assertEquals(2, highestVersion.getVersion());
assertEquals(4, highestVersion.getLocationList().size());
}
private KsmKeyLocationInfoGroup checkVersions(
List<KsmKeyLocationInfoGroup> versions) {
KsmKeyLocationInfoGroup currentVersion = null;
for (KsmKeyLocationInfoGroup version : versions) {
if (currentVersion != null) {
assertEquals(currentVersion.getVersion() + 1, version.getVersion());
for (KsmKeyLocationInfo info : currentVersion.getLocationList()) {
boolean found = false;
// all the blocks from the previous version must present in the next
// version
for (KsmKeyLocationInfo info2 : version.getLocationList()) {
if (info.getBlockID().equals(info2.getBlockID())) {
found = true;
break;
}
}
assertTrue(found);
}
}
currentVersion = version;
}
return currentVersion;
}
@Test
public void testReadLatestVersion() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
String keyName = "key" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setDataSize(1000)
.build();
String dataString = RandomStringUtils.randomAlphabetic(100);
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
// this write will create 1st version with one block
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
stream.write(dataString.getBytes());
}
byte[] data = new byte[dataString.length()];
try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
in.read(data);
}
KsmKeyInfo keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
assertEquals(dataString, DFSUtil.bytes2String(data));
assertEquals(0, keyInfo.getLatestVersionLocations().getVersion());
assertEquals(1,
keyInfo.getLatestVersionLocations().getLocationList().size());
// this write will create 2nd version, 2nd version will contain block from
// version 1, and add a new block
dataString = RandomStringUtils.randomAlphabetic(10);
data = new byte[dataString.length()];
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
stream.write(dataString.getBytes());
}
try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
in.read(data);
}
keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
assertEquals(dataString, DFSUtil.bytes2String(data));
assertEquals(1, keyInfo.getLatestVersionLocations().getVersion());
assertEquals(2,
keyInfo.getLatestVersionLocations().getLocationList().size());
dataString = RandomStringUtils.randomAlphabetic(200);
data = new byte[dataString.length()];
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
stream.write(dataString.getBytes());
}
try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
in.read(data);
}
keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
assertEquals(dataString, DFSUtil.bytes2String(data));
assertEquals(2, keyInfo.getLatestVersionLocations().getVersion());
assertEquals(3,
keyInfo.getLatestVersionLocations().getLocationList().size());
}
}

View File

@ -618,7 +618,8 @@ public void testDeleteKey() throws Exception {
// Memorize chunks that has been created,
// so we can verify actual deletions at DN side later.
for (KsmKeyInfo keyInfo : createdKeys) {
List<KsmKeyLocationInfo> locations = keyInfo.getKeyLocationList();
List<KsmKeyLocationInfo> locations =
keyInfo.getLatestVersionLocations().getLocationList();
for (KsmKeyLocationInfo location : locations) {
String containerName = location.getContainerName();
KeyData keyData = new KeyData(containerName, location.getBlockID());