HDDS-1220. KeyManager#openKey should release the bucket lock before doing an allocateBlock. Contributed by Lokesh Jain.

This commit is contained in:
Lokesh Jain 2019-03-11 14:54:44 +05:30
parent f0605146b3
commit ebb5fa115b
2 changed files with 160 additions and 194 deletions

View File

@ -240,37 +240,49 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
OMException.ResultCodes.KEY_NOT_FOUND); OMException.ResultCodes.KEY_NOT_FOUND);
} }
AllocatedBlock allocatedBlock;
try {
allocatedBlock =
scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
keyInfo.getFactor(), omId, excludeList);
} catch (SCMException ex) {
if (ex.getResult()
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
}
throw ex;
}
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setLength(scmBlockSize)
.setOffset(0);
if (grpcBlockTokenEnabled) {
String remoteUser = getRemoteUser().getShortUserName();
builder.setToken(secretManager.generateToken(remoteUser,
allocatedBlock.getBlockID().toString(),
getAclForUser(remoteUser),
scmBlockSize));
}
OmKeyLocationInfo info = builder.build();
// current version not committed, so new blocks coming now are added to // current version not committed, so new blocks coming now are added to
// the same version // the same version
keyInfo.appendNewBlocks(Collections.singletonList(info)); List<OmKeyLocationInfo> locationInfos =
allocateBlock(keyInfo, excludeList, scmBlockSize);
keyInfo.appendNewBlocks(locationInfos);
keyInfo.updateModifcationTime(); keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey, metadataManager.getOpenKeyTable().put(openKey,
keyInfo); keyInfo);
return info; return locationInfos.get(0);
}
private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo,
ExcludeList excludeList, long requestedSize) throws IOException {
int numBlocks = (int) ((requestedSize - 1) / scmBlockSize + 1);
List<OmKeyLocationInfo> locationInfos = new ArrayList<>(numBlocks);
while (requestedSize > 0) {
long allocateSize = Math.min(requestedSize, scmBlockSize);
AllocatedBlock allocatedBlock;
try {
allocatedBlock = scmBlockClient
.allocateBlock(allocateSize, keyInfo.getType(), keyInfo.getFactor(),
omId, excludeList);
} catch (SCMException ex) {
if (ex.getResult()
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
}
throw ex;
}
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setLength(scmBlockSize)
.setOffset(0);
if (grpcBlockTokenEnabled) {
String remoteUser = getRemoteUser().getShortUserName();
builder.setToken(secretManager
.generateToken(remoteUser, allocatedBlock.getBlockID().toString(),
getAclForUser(remoteUser), scmBlockSize));
}
locationInfos.add(builder.build());
requestedSize -= allocateSize;
}
return locationInfos;
} }
/* Optimize ugi lookup for RPC operations to avoid a trip through /* Optimize ugi lookup for RPC operations to avoid a trip through
@ -327,6 +339,10 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
ReplicationFactor factor = args.getFactor(); ReplicationFactor factor = args.getFactor();
ReplicationType type = args.getType(); ReplicationType type = args.getType();
long currentTime = Time.monotonicNowNanos(); long currentTime = Time.monotonicNowNanos();
long requestedSize = Math.min(preallocateMax, args.getDataSize());
OmKeyInfo keyInfo;
String openKey;
long openVersion;
FileEncryptionInfo encInfo = null; FileEncryptionInfo encInfo = null;
OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName); OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
@ -378,55 +394,14 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE; type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
} }
} }
long requestedSize = Math.min(preallocateMax, args.getDataSize());
List<OmKeyLocationInfo> locations = new ArrayList<>(); List<OmKeyLocationInfo> locations = new ArrayList<>();
String objectKey = metadataManager.getOzoneKey( String objectKey = metadataManager.getOzoneKey(
volumeName, bucketName, keyName); volumeName, bucketName, keyName);
// requested size is not required but more like a optimization:
// SCM looks at the requested, if it 0, no block will be allocated at
// the point, if client needs more blocks, client can always call
// allocateBlock. But if requested size is not 0, OM will preallocate
// some blocks and piggyback to client, to save RPC calls.
while (requestedSize > 0) {
long allocateSize = Math.min(scmBlockSize, requestedSize);
AllocatedBlock allocatedBlock;
try {
allocatedBlock = scmBlockClient
.allocateBlock(allocateSize, type, factor, omId,
new ExcludeList());
} catch (IOException ex) {
if (ex instanceof SCMException) {
if (((SCMException) ex).getResult()
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
throw new OMException(ex.getMessage(),
ResultCodes.SCM_IN_CHILL_MODE);
}
}
throw ex;
}
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setLength(allocateSize)
.setOffset(0);
if (grpcBlockTokenEnabled) {
String remoteUser = getRemoteUser().getShortUserName();
builder.setToken(secretManager.generateToken(remoteUser,
allocatedBlock.getBlockID().toString(),
getAclForUser(remoteUser),
scmBlockSize));
}
OmKeyLocationInfo subKeyInfo = builder.build();
locations.add(subKeyInfo);
requestedSize -= allocateSize;
}
// NOTE size of a key is not a hard limit on anything, it is a value that // NOTE size of a key is not a hard limit on anything, it is a value that
// client should expect, in terms of current size of key. If client sets a // client should expect, in terms of current size of key. If client sets a
// value, then this value is used, otherwise, we allocate a single block // value, then this value is used, otherwise, we allocate a single block
// which is the current size, if read by the client. // which is the current size, if read by the client.
long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize; long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
OmKeyInfo keyInfo;
long openVersion;
if (args.getIsMultipartKey()) { if (args.getIsMultipartKey()) {
// For this upload part we don't need to check in KeyTable. As this // For this upload part we don't need to check in KeyTable. As this
@ -449,7 +424,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
openVersion = 0; openVersion = 0;
} }
} }
String openKey = metadataManager.getOpenKey( openKey = metadataManager.getOpenKey(
volumeName, bucketName, keyName, currentTime); volumeName, bucketName, keyName, currentTime);
if (metadataManager.getOpenKeyTable().get(openKey) != null) { if (metadataManager.getOpenKeyTable().get(openKey) != null) {
// This should not happen. If this condition is satisfied, it means // This should not happen. If this condition is satisfied, it means
@ -464,10 +439,8 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
throw new OMException("Cannot allocate key. Not able to get a valid" + throw new OMException("Cannot allocate key. Not able to get a valid" +
"open key id.", ResultCodes.KEY_ALLOCATION_ERROR); "open key id.", ResultCodes.KEY_ALLOCATION_ERROR);
} }
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
LOG.debug("Key {} allocated in volume {} bucket {}", LOG.debug("Key {} allocated in volume {} bucket {}",
keyName, volumeName, bucketName); keyName, volumeName, bucketName);
return new OpenKeySession(currentTime, keyInfo, openVersion);
} catch (OMException e) { } catch (OMException e) {
throw e; throw e;
} catch (IOException ex) { } catch (IOException ex) {
@ -478,6 +451,19 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
} finally { } finally {
metadataManager.getLock().releaseBucketLock(volumeName, bucketName); metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
} }
// requested size is not required but more like a optimization:
// SCM looks at the requested, if it 0, no block will be allocated at
// the point, if client needs more blocks, client can always call
// allocateBlock. But if requested size is not 0, OM will preallocate
// some blocks and piggyback to client, to save RPC calls.
if (requestedSize > 0) {
List<OmKeyLocationInfo> locationInfos =
allocateBlock(keyInfo, new ExcludeList(), requestedSize);
keyInfo.appendNewBlocks(locationInfos);
}
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
return new OpenKeySession(currentTime, keyInfo, openVersion);
} }
/** /**

View File

@ -18,177 +18,141 @@
package org.apache.hadoop.ozone.om; package org.apache.hadoop.ozone.om;
import java.util.Arrays; import java.io.File;
import java.util.HashSet; import java.io.IOException;
import java.util.Set; import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec; import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec;
import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.*;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.db.CodecRegistry;
import org.apache.hadoop.utils.db.RDBStore;
import org.apache.hadoop.utils.db.Table;
import org.apache.hadoop.utils.db.TableConfig;
import org.junit.Before; import org.junit.Assert;
import org.junit.Rule; import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions; import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
/** /**
* Test class for @{@link KeyManagerImpl}. * Test class for @{@link KeyManagerImpl}.
* */ */
public class TestKeyManagerImpl { public class TestKeyManagerImpl {
private static KeyManagerImpl keyManager; private static KeyManagerImpl keyManager;
private static ScmBlockLocationProtocol scmBlockLocationProtocol; private static VolumeManagerImpl volumeManager;
private static BucketManagerImpl bucketManager;
private static StorageContainerManager scm;
private static ScmBlockLocationProtocol mockScmBlockLocationProtocol;
private static OzoneConfiguration conf; private static OzoneConfiguration conf;
private static OMMetadataManager metadataManager; private static OMMetadataManager metadataManager;
private static long blockSize = 1000; private static File dir;
private static long scmBlockSize;
private static final String KEY_NAME = "key1"; private static final String KEY_NAME = "key1";
private static final String BUCKET_NAME = "bucket1"; private static final String BUCKET_NAME = "bucket1";
private static final String VOLUME_NAME = "vol1"; private static final String VOLUME_NAME = "vol1";
private static RDBStore rdbStore = null;
private static Table<String, OmKeyInfo> keyTable = null;
private static Table<String, OmBucketInfo> bucketTable = null;
private static Table<String, OmVolumeArgs> volumeTable = null;
private static DBOptions options = null;
private KeyInfo keyData;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Before @BeforeClass
public void setUp() throws Exception { public static void setUp() throws Exception {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
scmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class); dir = GenericTestUtils.getRandomizedTestDir();
metadataManager = Mockito.mock(OMMetadataManager.class); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
keyManager = new KeyManagerImpl(scmBlockLocationProtocol, metadataManager, mockScmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
conf, "om1", null); metadataManager = new OmMetadataManagerImpl(conf);
setupMocks(); volumeManager = new VolumeManagerImpl(metadataManager, conf);
} bucketManager = new BucketManagerImpl(metadataManager);
NodeManager nodeManager = new MockNodeManager(true, 10);
SCMConfigurator configurator = new SCMConfigurator();
configurator.setScmNodeManager(nodeManager);
scm = TestUtils.getScm(conf, configurator);
scm.start();
scm.exitChillMode();
scmBlockSize = (long) conf
.getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
StorageUnit.BYTES);
conf.setLong(OZONE_KEY_PREALLOCATION_MAXSIZE, scmBlockSize * 10);
private void setupMocks() throws Exception { keyManager =
Mockito.when(scmBlockLocationProtocol new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf,
"om1", null);
Mockito.when(mockScmBlockLocationProtocol
.allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class), .allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class),
Mockito.any(ReplicationFactor.class), Mockito.anyString(), Mockito.any(ReplicationFactor.class), Mockito.anyString(),
Mockito.any(ExcludeList.class))) Mockito.any(ExcludeList.class))).thenThrow(
.thenThrow( new SCMException("ChillModePrecheck failed for allocateBlock",
new SCMException("ChillModePrecheck failed for allocateBlock", ResultCodes.CHILL_MODE_EXCEPTION));
ResultCodes.CHILL_MODE_EXCEPTION)); createVolume(VOLUME_NAME);
setupRocksDb(); createBucket(VOLUME_NAME, BUCKET_NAME);
Mockito.when(metadataManager.getVolumeTable()).thenReturn(volumeTable);
Mockito.when(metadataManager.getBucketTable()).thenReturn(bucketTable);
Mockito.when(metadataManager.getOpenKeyTable()).thenReturn(keyTable);
Mockito.when(metadataManager.getLock())
.thenReturn(new OzoneManagerLock(conf));
Mockito.when(metadataManager.getVolumeKey(VOLUME_NAME))
.thenReturn(VOLUME_NAME);
Mockito.when(metadataManager.getBucketKey(VOLUME_NAME, BUCKET_NAME))
.thenReturn(BUCKET_NAME);
Mockito.when(metadataManager.getOpenKey(VOLUME_NAME, BUCKET_NAME,
KEY_NAME, 1)).thenReturn(KEY_NAME);
} }
private void setupRocksDb() throws Exception { @AfterClass
options = new DBOptions(); public static void cleanup() throws Exception {
options.setCreateIfMissing(true); scm.stop();
options.setCreateMissingColumnFamilies(true); scm.join();
metadataManager.stop();
keyManager.stop();
FileUtils.deleteDirectory(dir);
}
Statistics statistics = new Statistics(); private static void createBucket(String volumeName, String bucketName)
statistics.setStatsLevel(StatsLevel.ALL); throws IOException {
options = options.setStatistics(statistics); OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
.setVolumeName(volumeName)
Set<TableConfig> configSet = new HashSet<>(); .setBucketName(bucketName)
for (String name : Arrays
.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
"testKeyTable", "testBucketTable", "testVolumeTable")) {
TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions());
configSet.add(newConfig);
}
keyData = KeyInfo.newBuilder()
.setKeyName(KEY_NAME)
.setBucketName(BUCKET_NAME)
.setVolumeName(VOLUME_NAME)
.setDataSize(blockSize)
.setType(ReplicationType.STAND_ALONE)
.setFactor(ReplicationFactor.ONE)
.setCreationTime(Time.now())
.setModificationTime(Time.now())
.build(); .build();
bucketManager.createBucket(bucketInfo);
}
CodecRegistry registry = new CodecRegistry(); private static void createVolume(String volumeName) throws IOException {
registry.addCodec(OmKeyInfo.class, new OmKeyInfoCodec()); OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
registry.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec()); .setVolume(volumeName)
registry.addCodec(OmBucketInfo.class, new OmBucketInfoCodec()); .setAdminName("bilbo")
rdbStore = new RDBStore(folder.newFolder(), options, configSet, registry); .setOwnerName("bilbo")
.build();
keyTable = volumeManager.createVolume(volumeArgs);
rdbStore.getTable("testKeyTable", String.class, OmKeyInfo.class);
bucketTable =
rdbStore.getTable("testBucketTable", String.class, OmBucketInfo.class);
volumeTable =
rdbStore.getTable("testVolumeTable", String.class, OmVolumeArgs.class);
volumeTable.put(VOLUME_NAME, OmVolumeArgs.newBuilder()
.setAdminName("a")
.setOwnerName("o")
.setVolume(VOLUME_NAME)
.build());
bucketTable.put(BUCKET_NAME,
new OmBucketInfo.Builder().setBucketName(BUCKET_NAME)
.setVolumeName(VOLUME_NAME).build());
keyTable.put(KEY_NAME, new OmKeyInfo.Builder()
.setVolumeName(VOLUME_NAME)
.setBucketName(BUCKET_NAME)
.setKeyName(KEY_NAME)
.setReplicationType(ReplicationType.STAND_ALONE)
.setReplicationFactor(ReplicationFactor.THREE)
.build());
} }
@Test @Test
public void allocateBlockFailureInChillMode() throws Exception { public void allocateBlockFailureInChillMode() throws Exception {
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME) KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
metadataManager, conf, "om1", null);
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setKeyName(KEY_NAME)
.setBucketName(BUCKET_NAME) .setBucketName(BUCKET_NAME)
.setFactor(ReplicationFactor.ONE) .setFactor(ReplicationFactor.ONE)
.setType(ReplicationType.STAND_ALONE) .setType(ReplicationType.STAND_ALONE)
.setVolumeName(VOLUME_NAME).build(); .setVolumeName(VOLUME_NAME)
.build();
OpenKeySession keySession = keyManager1.openKey(keyArgs);
LambdaTestUtils.intercept(OMException.class, LambdaTestUtils.intercept(OMException.class,
"ChillModePrecheck failed for allocateBlock", () -> { "ChillModePrecheck failed for allocateBlock", () -> {
keyManager.allocateBlock(keyArgs, 1, new ExcludeList()); keyManager1
.allocateBlock(keyArgs, keySession.getId(), new ExcludeList());
}); });
} }
@Test @Test
public void openKeyFailureInChillMode() throws Exception { public void openKeyFailureInChillMode() throws Exception {
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME) KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
metadataManager, conf, "om1", null);
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setKeyName(KEY_NAME)
.setBucketName(BUCKET_NAME) .setBucketName(BUCKET_NAME)
.setFactor(ReplicationFactor.ONE) .setFactor(ReplicationFactor.ONE)
.setDataSize(1000) .setDataSize(1000)
@ -196,7 +160,23 @@ public void openKeyFailureInChillMode() throws Exception {
.setVolumeName(VOLUME_NAME).build(); .setVolumeName(VOLUME_NAME).build();
LambdaTestUtils.intercept(OMException.class, LambdaTestUtils.intercept(OMException.class,
"ChillModePrecheck failed for allocateBlock", () -> { "ChillModePrecheck failed for allocateBlock", () -> {
keyManager.openKey(keyArgs); keyManager1.openKey(keyArgs);
}); });
} }
@Test
public void openKeyWithMultipleBlocks() throws IOException {
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setKeyName(UUID.randomUUID().toString())
.setBucketName(BUCKET_NAME)
.setFactor(ReplicationFactor.ONE)
.setDataSize(scmBlockSize * 10)
.setType(ReplicationType.STAND_ALONE)
.setVolumeName(VOLUME_NAME)
.build();
OpenKeySession keySession = keyManager.openKey(keyArgs);
OmKeyInfo keyInfo = keySession.getKeyInfo();
Assert.assertEquals(10,
keyInfo.getLatestVersionLocations().getLocationList().size());
}
} }