HDFS-11853. Ozone: KSM: Add getKey. Contributed by Chen Liang.

This commit is contained in:
Xiaoyu Yao 2017-05-30 15:15:55 -07:00 committed by Owen O'Malley
parent 59d273b175
commit 72b228a9e6
11 changed files with 295 additions and 37 deletions

View File

@ -117,7 +117,20 @@ KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
/**
* Allocate a block to a container, the block is returned to the client.
*
* @param args the args of the key.
* @return KsmKeyInfo isntacne that client uses to talk to container.
* @throws IOException
*/
KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
/**
* Look up for the container of an existing key.
*
* @param args the args of the key.
* @return KsmKeyInfo isntacne that client uses to talk to container.
* @throws IOException
*/
KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException;
}

View File

@ -42,9 +42,9 @@
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateKeyRequest;
.KeySpaceManagerProtocolProtos.LocateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateKeyResponse;
.KeySpaceManagerProtocolProtos.LocateKeyResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto
@ -330,7 +330,7 @@ public KsmBucketInfo getBucketInfo(String volume, String bucket)
*/
@Override
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder();
LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
@ -338,14 +338,37 @@ public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
.setDataSize(args.getDataSize()).build();
req.setKeyArgs(keyArgs);
final CreateKeyResponse resp;
final LocateKeyResponse resp;
try {
resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Get key block failed, error:" +
throw new IOException("Get key failed, error:" +
resp.getStatus());
}
return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
}
@Override
public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize()).build();
req.setKeyArgs(keyArgs);
final LocateKeyResponse resp;
try {
resp = rpcProxy.lookupKey(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Lookup key failed, error:" +
resp.getStatus());
}
return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());

View File

@ -48,8 +48,10 @@ enum Status {
BUCKET_NOT_FOUND = 8;
BUCKET_NOT_EMPTY = 9;
BUCKET_ALREADY_EXISTS = 10;
ACCESS_DENIED = 11;
INTERNAL_ERROR = 12;
KEY_ALREADY_EXISTS = 11;
KEY_NOT_FOUND = 12;
ACCESS_DENIED = 13;
INTERNAL_ERROR = 14;
}
@ -228,11 +230,11 @@ message KeyInfo {
required bool shouldCreateContainer = 7;
}
message CreateKeyRequest {
message LocateKeyRequest {
required KeyArgs keyArgs = 1;
}
message CreateKeyResponse {
message LocateKeyResponse {
required Status status = 1;
optional KeyInfo keyInfo = 2;
}
@ -290,8 +292,14 @@ service KeySpaceManagerService {
returns(InfoBucketResponse);
/**
Get key block.
Get key.
*/
rpc createKey(CreateKeyRequest)
returns(CreateKeyResponse);
rpc createKey(LocateKeyRequest)
returns(LocateKeyResponse);
/**
Look up for an existing key.
*/
rpc lookupKey(LocateKeyRequest)
returns(LocateKeyResponse);
}

View File

@ -33,7 +33,8 @@ public class KSMMetrics {
private @Metric MutableCounterLong numVolumeInfos;
private @Metric MutableCounterLong numBucketCreates;
private @Metric MutableCounterLong numBucketInfos;
private @Metric MutableCounterLong numKeyBlockAllocate;
private @Metric MutableCounterLong numKeyAllocate;
private @Metric MutableCounterLong numKeyLookup;
// Failure Metrics
private @Metric MutableCounterLong numVolumeCreateFails;
@ -41,7 +42,8 @@ public class KSMMetrics {
private @Metric MutableCounterLong numVolumeInfoFails;
private @Metric MutableCounterLong numBucketCreateFails;
private @Metric MutableCounterLong numBucketInfoFails;
private @Metric MutableCounterLong numKeyBlockAllocateFails;
private @Metric MutableCounterLong numKeyAllocateFails;
private @Metric MutableCounterLong numKeyLookupFails;
public KSMMetrics() {
}
@ -93,12 +95,20 @@ public void incNumBucketInfoFails() {
numBucketInfoFails.incr();
}
public void incNumKeyBlockAllocates() {
numKeyBlockAllocate.incr();
public void incNumKeyAllocates() {
numKeyAllocate.incr();
}
public void incNumKeyBlockAllocateFails() {
numKeyBlockAllocateFails.incr();
public void incNumKeyAllocateFails() {
numKeyAllocateFails.incr();
}
public void incNumKeyLookups() {
numKeyLookup.incr();
}
public void incNumKeyLookupFails() {
numKeyLookupFails.incr();
}
@VisibleForTesting
@ -152,12 +162,22 @@ public long getNumBucketInfoFails() {
}
@VisibleForTesting
public long getNumKeyBlockAllocates() {
return numKeyBlockAllocate.value();
public long getNumKeyAllocates() {
return numKeyAllocate.value();
}
@VisibleForTesting
public long getNumKeyBlockAllocateFailes() {
return numKeyBlockAllocateFails.value();
public long getNumKeyAllocateFails() {
return numKeyAllocateFails.value();
}
@VisibleForTesting
public long getNumKeyLookups() {
return numKeyLookup.value();
}
@VisibleForTesting
public long getNumKeyLookupFails() {
return numKeyLookupFails.value();
}
}

View File

@ -42,4 +42,14 @@ public interface KeyManager {
* @throws Exception
*/
KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
/**
* Look up an existing key. Return the info of the key to client side, which
* DistributedStorageHandler will use to access the data on datanode.
*
* @param args the args of the key provided by client.
* @return a KsmKeyInfo instance client uses to talk to container.
* @throws IOException
*/
KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException;
}

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.iq80.leveldb.DBException;
@ -106,4 +107,31 @@ public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
metadataManager.writeLock().unlock();
}
}
@Override
public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
try {
byte[] keyKey = metadataManager.getDBKeyForKey(
volumeName, bucketName, keyName);
byte[] value = metadataManager.get(keyKey);
if (value == null) {
LOG.error("Key: {} not found", keyKey);
throw new KSMException("Key not found",
KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
}
return KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
} catch (DBException ex) {
LOG.error("Get key failed for volume:{} bucket:{} key:{}",
volumeName, bucketName, keyName, ex);
throw new KSMException(ex.getMessage(),
KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
} finally {
metadataManager.writeLock().unlock();
}
}
}

View File

@ -408,18 +408,37 @@ public KsmBucketInfo getBucketInfo(String volume, String bucket)
}
/**
* Allocate a key block.
* Allocate a key.
*
* @param args - attributes of the key.
* @return
* @return KsmKeyInfo - the info about the allocated key.
* @throws IOException
*/
@Override
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
try {
metrics.incNumKeyBlockAllocates();
metrics.incNumKeyAllocates();
return keyManager.allocateKey(args);
} catch (Exception ex) {
metrics.incNumKeyBlockAllocateFails();
metrics.incNumKeyAllocateFails();
throw ex;
}
}
/**
* Lookup a key.
*
* @param args - attributes of the key.
* @return KsmKeyInfo - the info about the requested key.
* @throws IOException
*/
@Override
public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
try {
metrics.incNumKeyLookups();
return keyManager.lookupKey(args);
} catch (Exception ex) {
metrics.incNumKeyLookupFails();
throw ex;
}
}

View File

@ -103,6 +103,7 @@ public enum ResultCodes {
FAILED_BUCKET_ALREADY_EXISTS,
FAILED_BUCKET_NOT_FOUND,
FAILED_KEY_ALREADY_EXISTS,
FAILED_KEY_NOT_FOUND,
FAILED_INTERNAL_ERROR
}
}

View File

@ -38,9 +38,9 @@
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateKeyRequest;
.KeySpaceManagerProtocolProtos.LocateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateKeyResponse;
.KeySpaceManagerProtocolProtos.LocateKeyResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto
@ -105,6 +105,10 @@ private Status exceptionToResponseStatus(IOException ex) {
return Status.BUCKET_ALREADY_EXISTS;
case FAILED_BUCKET_NOT_FOUND:
return Status.BUCKET_NOT_FOUND;
case FAILED_KEY_ALREADY_EXISTS:
return Status.KEY_ALREADY_EXISTS;
case FAILED_KEY_NOT_FOUND:
return Status.KEY_NOT_FOUND;
default:
return Status.INTERNAL_ERROR;
}
@ -221,11 +225,11 @@ public InfoBucketResponse infoBucket(
}
@Override
public CreateKeyResponse createKey(
RpcController controller, CreateKeyRequest request
public LocateKeyResponse createKey(
RpcController controller, LocateKeyRequest request
) throws ServiceException {
CreateKeyResponse.Builder resp =
CreateKeyResponse.newBuilder();
LocateKeyResponse.Builder resp =
LocateKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
@ -242,4 +246,27 @@ public CreateKeyResponse createKey(
}
return resp.build();
}
@Override
public LocateKeyResponse lookupKey(
RpcController controller, LocateKeyRequest request
) throws ServiceException {
LocateKeyResponse.Builder resp =
LocateKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setDataSize(keyArgs.getDataSize())
.build();
KsmKeyInfo keyInfo = impl.lookupKey(ksmKeyArgs);
resp.setKeyInfo(keyInfo.getProtobuf());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
}

View File

@ -316,11 +316,21 @@ public void commitKey(KeyArgs args, OutputStream stream) throws
@Override
public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
OzoneException {
KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getSize())
.build();
KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName(), args.getKeyName());
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
String containerName = keyInfo.getContainerName();
XceiverClientSpi xceiverClient = getContainer(containerName);
boolean success = false;
try {
LOG.debug("get key accessing {} {}",
xceiverClient.getPipeline().getContainerName(), containerKey);
KeyData containerKeyData = containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,

View File

@ -19,6 +19,7 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -37,10 +38,13 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.Random;
@ -54,6 +58,9 @@ public class TestKeySpaceManager {
private static UserArgs userArgs;
private static KSMMetrics ksmMetrics;
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Create a MiniDFSCluster for testing.
* <p>
@ -197,14 +204,22 @@ public void testCreateBucket() throws IOException, OzoneException {
Assert.assertEquals(0, ksmMetrics.getNumBucketInfoFails());
}
/**
* Basic test of both putKey and getKey from KSM, as one can not be tested
* without the other.
*
* @throws IOException
* @throws OzoneException
*/
@Test
public void testGetKeyWriter() throws IOException, OzoneException {
public void testGetKeyWriterReader() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
String keyName = "key" + RandomStringUtils.randomNumeric(5);
Assert.assertEquals(0, ksmMetrics.getNumKeyBlockAllocates());
long numKeyAllocates = ksmMetrics.getNumKeyAllocates();
long numKeyLookups = ksmMetrics.getNumKeyLookups();
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
@ -219,10 +234,94 @@ public void testGetKeyWriter() throws IOException, OzoneException {
String dataString = RandomStringUtils.randomAscii(100);
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
keyArgs.setSize(4096);
keyArgs.setSize(100);
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
stream.write(dataString.getBytes());
}
Assert.assertEquals(1, ksmMetrics.getNumKeyBlockAllocates());
Assert.assertEquals(1 + numKeyAllocates, ksmMetrics.getNumKeyAllocates());
byte[] data = new byte[dataString.length()];
try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
in.read(data);
}
Assert.assertEquals(dataString, DFSUtil.bytes2String(data));
Assert.assertEquals(1 + numKeyLookups, ksmMetrics.getNumKeyLookups());
}
/**
* Test write the same key twice, the second write should fail, as currently
* key overwrite is not supported.
*
* @throws IOException
* @throws OzoneException
*/
@Test
public void testKeyOverwrite() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
String keyName = "key" + RandomStringUtils.randomNumeric(5);
long numKeyAllocateFails = ksmMetrics.getNumKeyAllocateFails();
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
keyArgs.setSize(100);
String dataString = RandomStringUtils.randomAscii(100);
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
stream.write(dataString.getBytes());
}
// try to put the same keyArg, should raise KEY_ALREADY_EXISTS exception
exception.expect(IOException.class);
exception.expectMessage("KEY_ALREADY_EXISTS");
KeyArgs keyArgs2 = new KeyArgs(volumeName, bucketName, keyName, userArgs);
storageHandler.newKeyWriter(keyArgs2);
Assert.assertEquals(1 + numKeyAllocateFails,
ksmMetrics.getNumKeyAllocateFails());
}
/**
* Test get a non-exiting key.
*
* @throws IOException
* @throws OzoneException
*/
@Test
public void testGetNonExistKey() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
String keyName = "key" + RandomStringUtils.randomNumeric(5);
long numKeyLookupFails = ksmMetrics.getNumKeyLookupFails();
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
// try to get the key, should fail as it hasn't been created
exception.expect(IOException.class);
exception.expectMessage("KEY_NOT_FOUND");
storageHandler.newKeyReader(keyArgs);
Assert.assertEquals(1 + numKeyLookupFails,
ksmMetrics.getNumKeyLookupFails());
}
}