HDFS-12543. Ozone : allow create key without specifying size. Contributed by Chen Liang.

This commit is contained in:
Nandakumar 2017-10-06 12:04:35 +05:30 committed by Owen O'Malley
parent c97ce55d06
commit c3ef381011
25 changed files with 868 additions and 199 deletions

View File

@ -141,6 +141,11 @@ public final class OzoneConfigKeys {
public static final int OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT
= 300000; // 300s for default
public static final String OZONE_KEY_PREALLOCATION_MAXSIZE =
"ozone.key.preallocation.maxsize";
public static final long OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT
= 128 * OzoneConsts.MB;
public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
"ozone.block.deleting.limit.per.task";
public static final int OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT

View File

@ -95,6 +95,8 @@ public final class OzoneConsts {
public static final String OZONE_HANDLER_LOCAL = "local";
public static final String DELETING_KEY_PREFIX = "#deleting#";
public static final String OPEN_KEY_PREFIX = "#open#";
public static final String OPEN_KEY_ID_DELIMINATOR = "#";
/**
* KSM LevelDB prefixes.

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
@ -56,53 +59,126 @@ public class ChunkGroupOutputStream extends OutputStream {
// array list's get(index) is O(1)
private final ArrayList<ChunkOutputStreamEntry> streamEntries;
private int currentStreamIndex;
private long totalSize;
private long byteOffset;
private final KeySpaceManagerProtocolClientSideTranslatorPB ksmClient;
private final
StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
private final KsmKeyArgs keyArgs;
private final int openID;
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
private final String requestID;
/**
* A constructor for testing purpose only.
*/
@VisibleForTesting
public ChunkGroupOutputStream() {
streamEntries = new ArrayList<>();
ksmClient = null;
scmClient = null;
keyArgs = null;
openID = -1;
xceiverClientManager = null;
chunkSize = 0;
requestID = null;
}
/**
* For testing purpose only. Not building output stream from blocks, but
* taking from externally.
*
* @param outputStream
* @param length
*/
@VisibleForTesting
public synchronized void addStream(OutputStream outputStream, long length) {
streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
}
public ChunkGroupOutputStream(
OpenKeySession handler, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
int chunkSize, String requestId) throws IOException {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.totalSize = 0;
this.byteOffset = 0;
this.ksmClient = ksmClient;
this.scmClient = scmClient;
KsmKeyInfo info = handler.getKeyInfo();
this.keyArgs = new KsmKeyArgs.Builder()
.setVolumeName(info.getVolumeName())
.setBucketName(info.getBucketName())
.setKeyName(info.getKeyName())
.setDataSize(info.getDataSize()).build();
this.openID = handler.getId();
this.xceiverClientManager = xceiverClientManager;
this.chunkSize = chunkSize;
this.requestID = requestId;
LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationList().size());
// server may return any number of blocks, (0 to any)
int idx = 0;
for (KsmKeyLocationInfo subKeyInfo : info.getKeyLocationList()) {
subKeyInfo.setIndex(idx++);
checkKeyLocationInfo(subKeyInfo);
}
}
private void checkKeyLocationInfo(KsmKeyLocationInfo subKeyInfo)
throws IOException {
String containerKey = subKeyInfo.getBlockID();
String containerName = subKeyInfo.getContainerName();
Pipeline pipeline = scmClient.getContainer(containerName);
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
// create container if needed
if (subKeyInfo.getShouldCreateContainer()) {
try {
scmClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerName,
NotifyObjectCreationStageRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(xceiverClient, requestID);
scmClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerName,
NotifyObjectCreationStageRequestProto.Stage.complete);
} catch (StorageContainerException ex) {
if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
//container already exist, this should never happen
LOG.debug("Container {} already exists.", containerName);
} else {
LOG.error("Container creation failed for {}.", containerName, ex);
throw ex;
}
}
}
streamEntries.add(new ChunkOutputStreamEntry(containerKey,
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
chunkSize, subKeyInfo.getLength()));
}
@VisibleForTesting
public long getByteOffset() {
return byteOffset;
}
/**
* Append another stream to the end of the list. Note that the streams are not
* actually created to this point, only enough meta data about the stream is
* stored. When something is to be actually written to the stream, the stream
* will be created (if not already).
*
* @param containerKey the key to store in the container
* @param key the ozone key
* @param xceiverClientManager xceiver manager instance
* @param xceiverClient xceiver manager instance
* @param requestID the request id
* @param chunkSize the chunk size for this key chunks
* @param length the total length of this key
*/
public synchronized void addStream(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
String requestID, int chunkSize, long length) {
streamEntries.add(new ChunkOutputStreamEntry(containerKey, key,
xceiverClientManager, xceiverClient, requestID, chunkSize, length));
totalSize += length;
}
@VisibleForTesting
public synchronized void addStream(OutputStream outputStream, long length) {
streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
totalSize += length;
}
@Override
public synchronized void write(int b) throws IOException {
if (streamEntries.size() <= currentStreamIndex) {
throw new IndexOutOfBoundsException();
Preconditions.checkNotNull(ksmClient);
// allocate a new block, if a exception happens, log an error and
// throw exception to the caller directly, and the write fails.
try {
allocateNewBlock(currentStreamIndex);
} catch (IOException ioe) {
LOG.error("Allocate block fail when writing.");
throw ioe;
}
}
ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
entry.write(b);
@ -137,15 +213,21 @@ public class ChunkGroupOutputStream extends OutputStream {
if (len == 0) {
return;
}
if (streamEntries.size() <= currentStreamIndex) {
throw new IOException("Write out of stream range! stream index:" +
currentStreamIndex);
}
if (totalSize - byteOffset < len) {
throw new IOException("Can not write " + len + " bytes with only " +
(totalSize - byteOffset) + " byte space");
}
int succeededAllocates = 0;
while (len > 0) {
if (streamEntries.size() <= currentStreamIndex) {
Preconditions.checkNotNull(ksmClient);
// allocate a new block, if a exception happens, log an error and
// throw exception to the caller directly, and the write fails.
try {
allocateNewBlock(currentStreamIndex);
succeededAllocates += 1;
} catch (IOException ioe) {
LOG.error("Try to allocate more blocks for write failed, already " +
"allocated " + succeededAllocates + " blocks for this write.");
throw ioe;
}
}
// in theory, this condition should never violate due the check above
// still do a sanity check.
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
@ -161,6 +243,21 @@ public class ChunkGroupOutputStream extends OutputStream {
}
}
/**
* Contact KSM to get a new block. Set the new block with the index (e.g.
* first block has index = 0, second has index = 1 etc.)
*
* The returned block is made to new ChunkOutputStreamEntry to write.
*
* @param index the index of the block.
* @throws IOException
*/
private void allocateNewBlock(int index) throws IOException {
KsmKeyLocationInfo subKeyInfo = ksmClient.allocateBlock(keyArgs, openID);
subKeyInfo.setIndex(index);
checkKeyLocationInfo(subKeyInfo);
}
@Override
public synchronized void flush() throws IOException {
for (int i = 0; i <= currentStreamIndex; i++) {
@ -168,10 +265,73 @@ public class ChunkGroupOutputStream extends OutputStream {
}
}
/**
* Commit the key to KSM, this will add the blocks as the new key blocks.
*
* @throws IOException
*/
@Override
public synchronized void close() throws IOException {
for (ChunkOutputStreamEntry entry : streamEntries) {
entry.close();
if (entry != null) {
entry.close();
}
}
if (keyArgs != null) {
// in test, this could be null
keyArgs.setDataSize(byteOffset);
ksmClient.commitKey(keyArgs, openID);
} else {
LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
}
}
/**
* Builder class of ChunkGroupOutputStream.
*/
public static class Builder {
private OpenKeySession openHandler;
private XceiverClientManager xceiverManager;
private StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
private KeySpaceManagerProtocolClientSideTranslatorPB ksmClient;
private int chunkSize;
private String requestID;
public Builder setHandler(OpenKeySession handler) {
this.openHandler = handler;
return this;
}
public Builder setXceiverClientManager(XceiverClientManager manager) {
this.xceiverManager = manager;
return this;
}
public Builder setScmClient(
StorageContainerLocationProtocolClientSideTranslatorPB client) {
this.scmClient = client;
return this;
}
public Builder setKsmClient(
KeySpaceManagerProtocolClientSideTranslatorPB client) {
this.ksmClient = client;
return this;
}
public Builder setChunkSize(int size) {
this.chunkSize = size;
return this;
}
public Builder setRequestID(String id) {
this.requestID = id;
return this;
}
public ChunkGroupOutputStream build() throws IOException {
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
ksmClient, chunkSize, requestID);
}
}
@ -266,56 +426,4 @@ public class ChunkGroupOutputStream extends OutputStream {
}
}
}
public static ChunkGroupOutputStream getFromKsmKeyInfo(
KsmKeyInfo keyInfo, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient,
int chunkSize, String requestId) throws IOException {
// TODO: the following createContainer and key writes may fail, in which
// case we should revert the above allocateKey to KSM.
// check index as sanity check
int index = 0;
String blockID;
ChunkGroupOutputStream groupOutputStream = new ChunkGroupOutputStream();
for (KsmKeyLocationInfo subKeyInfo : keyInfo.getKeyLocationList()) {
blockID = subKeyInfo.getBlockID();
Preconditions.checkArgument(index++ == subKeyInfo.getIndex());
String containerName = subKeyInfo.getContainerName();
Pipeline pipeline =
storageContainerLocationClient.getContainer(containerName);
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
// create container if needed
if (subKeyInfo.getShouldCreateContainer()) {
try {
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerName,
NotifyObjectCreationStageRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerName,
NotifyObjectCreationStageRequestProto.Stage.complete);
} catch (StorageContainerException ex) {
if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
//container already exist, this should never happen
LOG.debug("Container {} already exists.", containerName);
} else {
LOG.error("Container creation failed for {}.", containerName, ex);
throw ex;
}
}
}
groupOutputStream.addStream(blockID, keyInfo.getKeyName(),
xceiverClientManager, xceiverClient, requestId, chunkSize,
subKeyInfo.getLength());
}
return groupOutputStream;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.ksm.protocolPB
.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.ksm.protocolPB
@ -444,10 +445,16 @@ public class RpcClient implements ClientProtocol {
.setFactor(factor)
.build();
KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
OpenKeySession openKey = keySpaceManagerClient.openKey(keyArgs);
ChunkGroupOutputStream groupOutputStream =
ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
storageContainerLocationClient, chunkSize, requestId);
new ChunkGroupOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setScmClient(storageContainerLocationClient)
.setKsmClient(keySpaceManagerClient)
.setChunkSize(chunkSize)
.setRequestID(requestId)
.build();
return new OzoneOutputStream(groupOutputStream);
}

View File

@ -27,7 +27,7 @@ public final class KsmKeyArgs {
private final String volumeName;
private final String bucketName;
private final String keyName;
private final long dataSize;
private long dataSize;
private final ReplicationType type;
private final ReplicationFactor factor;
@ -65,6 +65,10 @@ public final class KsmKeyArgs {
return dataSize;
}
public void setDataSize(long size) {
dataSize = size;
}
/**
* Builder class of KsmKeyArgs.
*/

View File

@ -32,7 +32,7 @@ public final class KsmKeyInfo {
private final String bucketName;
// name of key client specified
private final String keyName;
private final long dataSize;
private long dataSize;
private List<KsmKeyLocationInfo> keyLocationList;
private final long creationTime;
private final long modificationTime;
@ -65,10 +65,18 @@ public final class KsmKeyInfo {
return dataSize;
}
public void setDataSize(long size) {
this.dataSize = size;
}
public List<KsmKeyLocationInfo> getKeyLocationList() {
return keyLocationList;
}
public void appendKeyLocation(KsmKeyLocationInfo newLocation) {
keyLocationList.add(newLocation);
}
public long getCreationTime() {
return creationTime;
}

View File

@ -28,7 +28,7 @@ public final class KsmKeyLocationInfo {
private final String blockID;
private final boolean shouldCreateContainer;
// the id of this subkey in all the subkeys.
private final int index;
private int index;
private final long length;
private final long offset;
@ -59,6 +59,10 @@ public final class KsmKeyLocationInfo {
return index;
}
public void setIndex(int idx) {
index = idx;
}
public long getLength() {
return length;
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.ksm.helpers;
/**
* This class represents a open key "session". A session here means a key is
* opened by a specific client, the client sends the handler to server, such
* that servers can recognize this client, and thus know how to close the key.
*/
public class OpenKeySession {
private final int id;
private final KsmKeyInfo keyInfo;
public OpenKeySession(int id, KsmKeyInfo info) {
this.id = id;
this.keyInfo = info;
}
public KsmKeyInfo getKeyInfo() {
return keyInfo;
}
public int getId() {
return id;
}
}

View File

@ -21,7 +21,9 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import java.io.IOException;
@ -129,13 +131,35 @@ public interface KeySpaceManagerProtocol {
void setBucketProperty(KsmBucketArgs args) throws IOException;
/**
* Allocate a block to a container, the block is returned to the client.
* Open the given key and return an open key session.
*
* @param args the args of the key.
* @return KsmKeyInfo isntacne that client uses to talk to container.
* @return OpenKeySession instance that client uses to talk to container.
* @throws IOException
*/
KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
OpenKeySession openKey(KsmKeyArgs args) throws IOException;
/**
* Commit a key. This will make the change from the client visible. The client
* is identified by the clientID.
*
* @param args the key to commit
* @param clientID the client identification
* @throws IOException
*/
void commitKey(KsmKeyArgs args, int clientID) throws IOException;
/**
* Allocate a new block, it is assumed that the client is having an open key
* session going on. This block will be appended to this open key session.
*
* @param args the key to append
* @param clientID the client identification
* @return an allocated block
* @throws IOException
*/
KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
throws IOException;
/**
* Look up for the container of an existing key.

View File

@ -28,8 +28,18 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CommitKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CommitKeyResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.BucketArgs;
import org.apache.hadoop.ozone.protocol.proto
@ -498,24 +508,23 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
}
/**
* Allocate a block for a key, then use the returned meta info to talk to data
* node to actually write the key.
* Create a new open session of the key, then use the returned meta info to
* talk to data node to actually write the key.
* @param args the args for the key to be allocated
* @return a handler to the key, returned client
* @throws IOException
*/
@Override
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize())
.setType(args.getType())
.setFactor(args.getFactor())
.build();
req.setKeyArgs(keyArgs);
.setKeyName(args.getKeyName());
if (args.getDataSize() > 0) {
keyArgs.setDataSize(args.getDataSize());
}
req.setKeyArgs(keyArgs.build());
final LocateKeyResponse resp;
try {
@ -524,12 +533,62 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Create key failed, error:" +
throw new IOException("Create key failed, error:" + resp.getStatus());
}
return new OpenKeySession(resp.getID(),
KsmKeyInfo.getFromProtobuf(resp.getKeyInfo()));
}
@Override
public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
throws IOException {
AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize()).build();
req.setKeyArgs(keyArgs);
req.setClientID(clientID);
final AllocateBlockResponse resp;
try {
resp = rpcProxy.allocateBlock(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Allocate block failed, error:" +
resp.getStatus());
}
return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
return KsmKeyLocationInfo.getFromProtobuf(resp.getKeyLocation());
}
@Override
public void commitKey(KsmKeyArgs args, int clientID)
throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize()).build();
req.setKeyArgs(keyArgs);
req.setClientID(clientID);
final CommitKeyResponse resp;
try {
resp = rpcProxy.commitKey(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Commit key failed, error:" +
resp.getStatus());
}
}
@Override
public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();

View File

@ -255,6 +255,9 @@ message LocateKeyRequest {
message LocateKeyResponse {
required Status status = 1;
optional KeyInfo keyInfo = 2;
// clients' followup request may carry this ID for stateful operations (similar
// to a cookie).
optional uint32 ID = 3;
}
message SetBucketPropertyRequest {
@ -287,6 +290,25 @@ message ListKeysResponse {
repeated KeyInfo keyInfo = 2;
}
message AllocateBlockRequest {
required KeyArgs keyArgs = 1;
required uint32 clientID = 2;
}
message AllocateBlockResponse {
required Status status = 1;
required KeyLocation keyLocation = 2;
}
message CommitKeyRequest {
required KeyArgs keyArgs = 1;
required uint32 clientID = 2;
}
message CommitKeyResponse {
required Status status = 1;
}
/**
The KSM service that takes care of Ozone namespace.
*/
@ -380,4 +402,16 @@ service KeySpaceManagerService {
*/
rpc listKeys(ListKeysRequest)
returns(ListKeysResponse);
/**
Commit a key.
*/
rpc commitKey(CommitKeyRequest)
returns(CommitKeyResponse);
/**
Allocate a new block for a key.
*/
rpc allocateBlock(AllocateBlockRequest)
returns(AllocateBlockResponse);
}

View File

@ -125,6 +125,27 @@ public interface KSMMetadataManager {
*/
byte[] getDeletedKeyName(byte[] keyName);
/**
* Returns the DB key name of a open key in KSM metadata store.
* Should be #open# prefix followed by actual key name.
* @param keyName - key name
* @param id - the id for this open
* @return bytes of DB key.
*/
byte[] getOpenKeyNameBytes(String keyName, int id);
/**
* Returns the full name of a key given volume name, bucket name and key name.
* Generally done by padding certain delimiters.
*
* @param volumeName - volume name
* @param bucketName - bucket name
* @param keyName - key name
* @return the full key name.
*/
String getKeyWithDBPrefix(String volumeName, String bucketName,
String keyName);
/**
* Given a volume, check if it is empty,
* i.e there are no buckets inside it.

View File

@ -53,6 +53,8 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
@ -153,7 +155,8 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
return sb.toString();
}
private String getKeyWithDBPrefix(String volume, String bucket, String key) {
@Override
public String getKeyWithDBPrefix(String volume, String bucket, String key) {
String keyVB = OzoneConsts.KSM_KEY_PREFIX + volume
+ OzoneConsts.KSM_KEY_PREFIX + bucket
+ OzoneConsts.KSM_KEY_PREFIX;
@ -171,6 +174,12 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
}
@Override
public byte[] getOpenKeyNameBytes(String keyName, int id) {
return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id +
OPEN_KEY_ID_DELIMINATOR + keyName);
}
/**
* Returns the read lock used on Metadata DB.
* @return readLock

View File

@ -56,6 +56,8 @@ public class KSMMetrics {
private @Metric MutableCounterLong numBucketLists;
private @Metric MutableCounterLong numKeyLists;
private @Metric MutableCounterLong numVolumeLists;
private @Metric MutableCounterLong numKeyCommits;
private @Metric MutableCounterLong numAllocateBlockCalls;
// Failure Metrics
private @Metric MutableCounterLong numVolumeCreateFails;
@ -73,6 +75,8 @@ public class KSMMetrics {
private @Metric MutableCounterLong numBucketListFails;
private @Metric MutableCounterLong numKeyListFails;
private @Metric MutableCounterLong numVolumeListFails;
private @Metric MutableCounterLong numKeyCommitFails;
private @Metric MutableCounterLong numBlockAllocateCallFails;
public KSMMetrics() {
}
@ -207,6 +211,23 @@ public class KSMMetrics {
numKeyDeletes.incr();
}
public void incNumKeyCommits() {
numKeyOps.incr();
numKeyCommits.incr();
}
public void incNumKeyCommitFails() {
numKeyCommitFails.incr();
}
public void incNumBlockAllocateCalls() {
numAllocateBlockCalls.incr();
}
public void incNumBlockAllocateCallFails() {
numBlockAllocateCallFails.incr();
}
public void incNumBucketListFails() {
numBucketListFails.incr();
}
@ -369,6 +390,26 @@ public class KSMMetrics {
return numVolumeListFails.value();
}
@VisibleForTesting
public long getNumKeyCommits() {
return numKeyCommits.value();
}
@VisibleForTesting
public long getNumKeyCommitFails() {
return numKeyCommitFails.value();
}
@VisibleForTesting
public long getNumBlockAllocates() {
return numAllocateBlockCalls.value();
}
@VisibleForTesting
public long getNumBlockAllocateFails() {
return numBlockAllocateCallFails.value();
}
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import java.io.IOException;
import java.util.List;
@ -39,22 +41,39 @@ public interface KeyManager {
void stop() throws IOException;
/**
* Given the args of a key to put, return a pipeline for the key. Writes
* the key to pipeline mapping to meta data.
* After calling commit, the key will be made visible. There can be multiple
* open key writes in parallel (identified by client id). The most recently
* committed one will be the one visible.
*
* Note that this call only allocate a block for key, and adds the
* corresponding entry to metadata. The block will be returned to client side
* handler DistributedStorageHandler. Which will make another call to
* datanode to create container (if needed) and writes the key.
* @param args the key to commit.
* @param clientID the client that is committing.
* @throws IOException
*/
void commitKey(KsmKeyArgs args, int clientID) throws IOException;
/**
* A client calls this on an open key, to request to allocate a new block,
* and appended to the tail of current block list of the open client.
*
* @param args the key to append
* @param clientID the client requesting block.
* @return the reference to the new block.
* @throws IOException
*/
KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
throws IOException;
/**
* Given the args of a key to put, write an open key entry to meta data.
*
* In case that the container creation or key write failed on
* DistributedStorageHandler, this key's metadata will still stay in KSM.
* TODO garbage collect the open keys that never get closed
*
* @param args the args of the key provided by client.
* @return a KsmKeyInfo instance client uses to talk to container.
* @return a OpenKeySession instance client uses to talk to container.
* @throws Exception
*/
KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
OpenKeySession openKey(KsmKeyArgs args) throws IOException;
/**
* Look up an existing key. Return the info of the key to client side, which

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
@ -47,6 +49,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVI
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
@ -69,6 +73,9 @@ public class KeyManagerImpl implements KeyManager {
private final boolean useRatis;
private final BackgroundService keyDeletingService;
private final long preallocateMax;
private final Random random;
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
KSMMetadataManager metadataManager, OzoneConfiguration conf) {
this.scmBlockClient = scmBlockClient;
@ -83,8 +90,12 @@ public class KeyManagerImpl implements KeyManager {
long serviceTimeout = conf.getTimeDuration(
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
this.preallocateMax = conf.getLong(
OZONE_KEY_PREALLOCATION_MAXSIZE,
OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
keyDeletingService = new KeyDeletingService(
scmBlockClient, this, svcInterval, serviceTimeout, conf);
random = new Random();
}
@Override
@ -97,8 +108,28 @@ public class KeyManagerImpl implements KeyManager {
keyDeletingService.shutdown();
}
private void validateBucket(String volumeName, String bucketName)
throws IOException {
byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
//Check if the volume exists
if(metadataManager.get(volumeKey) == null) {
LOG.error("volume not found: {}", volumeName);
throw new KSMException("Volume not found",
KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
//Check if bucket already exists
if(metadataManager.get(bucketKey) == null) {
LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
throw new KSMException("Bucket not found",
KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
}
}
@Override
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
@ -118,80 +149,162 @@ public class KeyManagerImpl implements KeyManager {
}
try {
byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
byte[] keyKey =
metadataManager.getDBKeyBytes(volumeName, bucketName, keyName);
//Check if the volume exists
if (metadataManager.get(volumeKey) == null) {
LOG.debug("volume not found: {}", volumeName);
throw new KSMException("Volume not found",
KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
//Check if bucket already exists
if (metadataManager.get(bucketKey) == null) {
LOG.debug("bucket not found: {}/{} ", volumeName, bucketName);
throw new KSMException("Bucket not found",
KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
validateBucket(volumeName, bucketName);
String objectKey = metadataManager.getKeyWithDBPrefix(
volumeName, bucketName, keyName);
byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID);
byte[] keyData = metadataManager.get(openKey);
if (keyData == null) {
LOG.error("Allocate block for a key not in open status in meta store " +
objectKey + " with ID " + clientID);
throw new KSMException("Open Key not found",
KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
}
AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(scmBlockSize, type, factor);
KsmKeyInfo keyInfo =
KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
KsmKeyLocationInfo info = new KsmKeyLocationInfo.Builder()
.setContainerName(allocatedBlock.getPipeline().getContainerName())
.setBlockID(allocatedBlock.getKey())
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setLength(scmBlockSize)
.setOffset(0)
.setIndex(keyInfo.getKeyLocationList().size())
.build();
keyInfo.appendKeyLocation(info);
metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
return info;
} finally {
metadataManager.writeLock().unlock();
}
}
// TODO: Garbage collect deleted blocks due to overwrite of a key.
// FIXME: BUG: Please see HDFS-11922.
// If user overwrites a key, then we are letting it pass without
// corresponding process.
// In reality we need to garbage collect those blocks by telling SCM to
// clean up those blocks when it can. Right now making this change
// allows us to pass tests that expect ozone can overwrite a key.
@Override
public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
ReplicationFactor factor = args.getFactor();
ReplicationType type = args.getType();
// When we talk to SCM make sure that we ask for at least a byte in the
// block. This way even if the call is for a zero length key, we back it
// with a actual SCM block.
// TODO : Review this decision later. We can get away with only a
// metadata entry in case of 0 length key.
long targetSize = args.getDataSize();
List<KsmKeyLocationInfo> subKeyInfos = new ArrayList<>();
// If user does not specify a replication strategy or
// replication factor, KSM will use defaults.
if(factor == null) {
factor = useRatis ? ReplicationFactor.THREE: ReplicationFactor.ONE;
}
if(type == null) {
type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
}
try {
validateBucket(volumeName, bucketName);
long requestedSize = Math.min(preallocateMax, args.getDataSize());
List<KsmKeyLocationInfo> locations = new ArrayList<>();
String objectKey = metadataManager.getKeyWithDBPrefix(
volumeName, bucketName, keyName);
// requested size is not required but more like a optimization:
// SCM looks at the requested, if it 0, no block will be allocated at
// the point, if client needs more blocks, client can always call
// allocateBlock. But if requested size is not 0, KSM will preallocate
// some blocks and piggyback to client, to save RPC calls.
int idx = 0;
long offset = 0;
// in case targetSize == 0, subKeyInfos will be an empty list
while (targetSize > 0) {
long allocateSize = Math.min(targetSize, scmBlockSize);
while (requestedSize > 0) {
long allocateSize = Math.min(scmBlockSize, requestedSize);
AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(allocateSize, type, factor);
KsmKeyLocationInfo subKeyInfo = new KsmKeyLocationInfo.Builder()
.setContainerName(allocatedBlock.getPipeline().getContainerName())
.setBlockID(allocatedBlock.getKey())
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setIndex(idx)
.setIndex(idx++)
.setLength(allocateSize)
.setOffset(offset)
.setOffset(0)
.build();
idx += 1;
offset += allocateSize;
targetSize -= allocateSize;
subKeyInfos.add(subKeyInfo);
locations.add(subKeyInfo);
requestedSize -= allocateSize;
}
long currentTime = Time.now();
KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
// NOTE size of a key is not a hard limit on anything, it is a value that
// client should expect, in terms of current size of key. If client sets a
// value, then this value is used, otherwise, we allocate a single block
// which is the current size, if read by the client.
long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
KsmKeyInfo keyInfo = new KsmKeyInfo.Builder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize())
.setKsmKeyLocationInfos(subKeyInfos)
.setKsmKeyLocationInfos(locations)
.setCreationTime(currentTime)
.setModificationTime(currentTime)
.setDataSize(size)
.build();
metadataManager.put(keyKey, keyBlock.getProtobuf().toByteArray());
LOG.debug("Key {} allocated in volume {} bucket {}", keyName, volumeName,
bucketName);
return keyBlock;
// Generate a random ID which is not already in meta db.
int id = -1;
// in general this should finish in a couple times at most. putting some
// arbitrary large number here to avoid dead loop.
for (int j = 0; j < 10000; j++) {
id = random.nextInt();
byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, id);
if (metadataManager.get(openKey) == null) {
metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
break;
}
}
if (id == -1) {
throw new IOException("Failed to find a usable id for " + objectKey);
}
LOG.debug("Key {} allocated in volume {} bucket {}",
keyName, volumeName, bucketName);
return new OpenKeySession(id, keyInfo);
} catch (KSMException e) {
throw e;
} catch (IOException ex) {
if (!(ex instanceof KSMException)) {
LOG.error("Key allocation failed for volume:{} bucket:{} key:{}",
LOG.error("Key open failed for volume:{} bucket:{} key:{}",
volumeName, bucketName, keyName, ex);
}
throw new KSMException(ex.getMessage(),
KSMException.ResultCodes.FAILED_KEY_ALLOCATION);
} finally {
metadataManager.writeLock().unlock();
}
}
@Override
public void commitKey(KsmKeyArgs args, int clientID) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
try {
validateBucket(volumeName, bucketName);
String objectKey = metadataManager.getKeyWithDBPrefix(
volumeName, bucketName, keyName);
byte[] objectKeyBytes = metadataManager.getDBKeyBytes(volumeName,
bucketName, keyName);
byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID);
byte[] openKeyData = metadataManager.get(openKey);
if (openKeyData == null) {
throw new KSMException("Commit a key without corresponding entry " +
DFSUtil.bytes2String(openKey), ResultCodes.FAILED_KEY_NOT_FOUND);
}
KsmKeyInfo keyInfo =
KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData));
keyInfo.setDataSize(args.getDataSize());
BatchOperation batch = new BatchOperation();
batch.delete(openKey);
batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray());
metadataManager.writeBatch(batch);
} catch (KSMException e) {
throw e;
} catch (IOException ex) {
if (!(ex instanceof KSMException)) {
LOG.error("Key commit failed for volume:{} bucket:{} key:{}",
volumeName, bucketName, keyName, ex);
}
throw new KSMException(ex.getMessage(),

View File

@ -28,7 +28,9 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -466,16 +468,40 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
* @throws IOException
*/
@Override
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
try {
metrics.incNumKeyAllocates();
return keyManager.allocateKey(args);
return keyManager.openKey(args);
} catch (Exception ex) {
metrics.incNumKeyAllocateFails();
throw ex;
}
}
@Override
public void commitKey(KsmKeyArgs args, int clientID)
throws IOException {
try {
metrics.incNumKeyCommits();
keyManager.commitKey(args, clientID);
} catch (Exception ex) {
metrics.incNumKeyCommitFails();
throw ex;
}
}
@Override
public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
throws IOException {
try {
metrics.incNumBlockAllocateCalls();
return keyManager.allocateBlock(args, clientID);
} catch (Exception ex) {
metrics.incNumBlockAllocateCallFails();
throw ex;
}
}
/**
* Lookup a key.
*

View File

@ -23,10 +23,20 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CommitKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CommitKeyResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateBucketRequest;
import org.apache.hadoop.ozone.protocol.proto
@ -81,6 +91,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.List
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -303,16 +314,26 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
LocateKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OzoneProtos.ReplicationType type =
keyArgs.hasType()? keyArgs.getType() : null;
OzoneProtos.ReplicationFactor factor =
keyArgs.hasFactor()? keyArgs.getFactor() : null;
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setDataSize(keyArgs.getDataSize())
.setType(keyArgs.getType())
.setFactor(keyArgs.getFactor())
.setType(type)
.setFactor(factor)
.build();
KsmKeyInfo keyInfo = impl.allocateKey(ksmKeyArgs);
resp.setKeyInfo(keyInfo.getProtobuf());
if (keyArgs.hasDataSize()) {
ksmKeyArgs.setDataSize(keyArgs.getDataSize());
} else {
ksmKeyArgs.setDataSize(0);
}
OpenKeySession openKey = impl.openKey(ksmKeyArgs);
resp.setKeyInfo(openKey.getKeyInfo().getProtobuf());
resp.setID(openKey.getId());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
@ -332,7 +353,6 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setDataSize(keyArgs.getDataSize())
.build();
KsmKeyInfo keyInfo = impl.lookupKey(ksmKeyArgs);
resp.setKeyInfo(keyInfo.getProtobuf());
@ -436,4 +456,47 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
}
return resp.build();
}
@Override
public CommitKeyResponse commitKey(RpcController controller,
CommitKeyRequest request) throws ServiceException {
CommitKeyResponse.Builder resp =
CommitKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
int id = request.getClientID();
impl.commitKey(ksmKeyArgs, id);
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
@Override
public AllocateBlockResponse allocateBlock(RpcController controller,
AllocateBlockRequest request) throws ServiceException {
AllocateBlockResponse.Builder resp =
AllocateBlockResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
int id = request.getClientID();
KsmKeyLocationInfo newLocation = impl.allocateBlock(ksmKeyArgs, id);
resp.setKeyLocation(newLocation.getProtobuf());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.ksm.protocolPB
.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.conf.OzoneConfiguration;
@ -409,10 +410,16 @@ public final class DistributedStorageHandler implements StorageHandler {
.setFactor(xceiverClientManager.getFactor())
.build();
// contact KSM to allocate a block for key.
KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
OpenKeySession openKey = keySpaceManagerClient.openKey(keyArgs);
ChunkGroupOutputStream groupOutputStream =
ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
storageContainerLocationClient, chunkSize, args.getRequestID());
new ChunkGroupOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setScmClient(storageContainerLocationClient)
.setKsmClient(keySpaceManagerClient)
.setChunkSize(chunkSize)
.setRequestID(args.getRequestID())
.build();
return new OzoneOutputStream(groupOutputStream);
}

View File

@ -1062,4 +1062,24 @@
exceed this count then the oldest idle connection is evicted.
</description>
</property>
<property>
<name>ozone.key.preallocation.maxsize</name>
<value>134217728</value>
<tag>OZONE, KSM, PERFORMANCE</tag>
<description>
When a new key write request is sent to KSM, if a size is requested, at most
128MB of size is allocated at request time. If client needs more space for the
write, separate block allocation requests will be made.
</description>
</property>
<property>
<name>ozone.client.list.cache</name>
<value>1000</value>
<tag>OZONE, PERFORMANCE</tag>
<description>
Configuration property to configure the cache size of client list calls.
</description>
</property>
</configuration>

View File

@ -25,7 +25,6 @@ import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
@ -103,9 +102,7 @@ public class TestChunkStreams {
// TODO : if we decide to take the 400 bytes instead in the future,
// other add more informative error code rather than exception, need to
// change this part.
exception.expect(IOException.class);
exception.expectMessage(
"Can not write 500 bytes with only 400 byte space");
exception.expect(Exception.class);
groupOutputStream.write(RandomStringUtils.randomAscii(500).getBytes());
assertEquals(100, groupOutputStream.getByteOffset());
}

View File

@ -176,7 +176,7 @@ public class TestKSMMetrcis {
.getInternalState(ksmManager, "keyManager");
KeyManager mockKm = Mockito.spy(bucketManager);
Mockito.doReturn(null).when(mockKm).allocateKey(null);
Mockito.doReturn(null).when(mockKm).openKey(null);
Mockito.doNothing().when(mockKm).deleteKey(null);
Mockito.doReturn(null).when(mockKm).lookupKey(null);
Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0);
@ -192,7 +192,7 @@ public class TestKSMMetrcis {
assertCounter("NumKeyLists", 1L, ksmMetrics);
// inject exception to test for Failure Metrics
Mockito.doThrow(exception).when(mockKm).allocateKey(null);
Mockito.doThrow(exception).when(mockKm).openKey(null);
Mockito.doThrow(exception).when(mockKm).deleteKey(null);
Mockito.doThrow(exception).when(mockKm).lookupKey(null);
Mockito.doThrow(exception).when(mockKm).listKeys(
@ -284,7 +284,7 @@ public class TestKSMMetrcis {
*/
private void doKeyOps() {
try {
ksmManager.allocateKey(null);
ksmManager.openKey(null);
} catch (IOException ignored) {
}

View File

@ -232,14 +232,15 @@ public class TestKSMSQLCli {
sql = "SELECT * FROM keyInfo";
rs = executeQuery(conn, sql);
HashMap<String, List<String>> expectedMap2 = new HashMap<>();
// no data written, data size will be 0
expectedMap2.put(keyName0,
Arrays.asList(volumeName0, bucketName0, Integer.toString(100)));
Arrays.asList(volumeName0, bucketName0, "0"));
expectedMap2.put(keyName1,
Arrays.asList(volumeName1, bucketName1, Integer.toString(200)));
Arrays.asList(volumeName1, bucketName1, "0"));
expectedMap2.put(keyName2,
Arrays.asList(volumeName0, bucketName2, Integer.toString(300)));
Arrays.asList(volumeName0, bucketName2, "0"));
expectedMap2.put(keyName3,
Arrays.asList(volumeName0, bucketName2, Integer.toString(400)));
Arrays.asList(volumeName0, bucketName2, "0"));
while (rs.next()) {
String volumeName = rs.getString("volumeName");
String bucketName = rs.getString("bucketName");

View File

@ -990,6 +990,57 @@ public class TestKeySpaceManager {
Assert.assertTrue((OzoneUtils.formatDate(keyInfo.getModifiedOn())
/ 1000) >= (currentTime / 1000));
Assert.assertEquals(keyName, keyInfo.getKeyName());
Assert.assertEquals(4096, keyInfo.getSize());
// with out data written, the size would be 0
Assert.assertEquals(0, keyInfo.getSize());
}
/**
* Test that the write can proceed without having to set the right size.
*
* @throws IOException
*/
@Test
public void testWriteSize() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
String dataString = RandomStringUtils.randomAscii(100);
// write a key without specifying size at all
String keyName = "testKey";
KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
stream.write(dataString.getBytes());
}
byte[] data = new byte[dataString.length()];
try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
in.read(data);
}
Assert.assertEquals(dataString, DFSUtil.bytes2String(data));
// write a key with a size, but write above it.
String keyName1 = "testKey1";
KeyArgs keyArgs1 = new KeyArgs(keyName1, bucketArgs);
keyArgs1.setSize(30);
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs1)) {
stream.write(dataString.getBytes());
}
byte[] data1 = new byte[dataString.length()];
try (InputStream in = storageHandler.newKeyReader(keyArgs1)) {
in.read(data1);
}
Assert.assertEquals(dataString, DFSUtil.bytes2String(data1));
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -70,7 +71,7 @@ public class TestMultipleContainerReadWrite {
public static void init() throws Exception {
conf = new OzoneConfiguration();
// set to as small as 100 bytes per block.
conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, 100);
conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, 1);
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5);
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
@ -110,9 +111,9 @@ public class TestMultipleContainerReadWrite {
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
String dataString = RandomStringUtils.randomAscii(500);
String dataString = RandomStringUtils.randomAscii(3 * (int)OzoneConsts.MB);
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
keyArgs.setSize(500);
keyArgs.setSize(3 * (int)OzoneConsts.MB);
try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
outputStream.write(dataString.getBytes());
@ -126,10 +127,14 @@ public class TestMultipleContainerReadWrite {
// checking whether container meta data has the chunk file persisted.
MetricsRecordBuilder containerMetrics = getMetrics(
"StorageContainerMetrics");
assertCounter("numWriteChunk", 5L, containerMetrics);
assertCounter("numReadChunk", 5L, containerMetrics);
assertCounter("numWriteChunk", 3L, containerMetrics);
assertCounter("numReadChunk", 3L, containerMetrics);
}
// Disable this test, because this tests assumes writing beyond a specific
// size is not allowed. Which is not true for now. Keeping this test in case
// we add this restrict in the future.
@Ignore
@Test
public void testErrorWrite() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);