diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 5caeaeaa2e..6500a5ec18 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -240,37 +240,49 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, OMException.ResultCodes.KEY_NOT_FOUND); } - AllocatedBlock allocatedBlock; - try { - allocatedBlock = - scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(), - keyInfo.getFactor(), omId, excludeList); - } catch (SCMException ex) { - if (ex.getResult() - .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) { - throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE); - } - throw ex; - } - OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() - .setBlockID(new BlockID(allocatedBlock.getBlockID())) - .setLength(scmBlockSize) - .setOffset(0); - if (grpcBlockTokenEnabled) { - String remoteUser = getRemoteUser().getShortUserName(); - builder.setToken(secretManager.generateToken(remoteUser, - allocatedBlock.getBlockID().toString(), - getAclForUser(remoteUser), - scmBlockSize)); - } - OmKeyLocationInfo info = builder.build(); // current version not committed, so new blocks coming now are added to // the same version - keyInfo.appendNewBlocks(Collections.singletonList(info)); + List locationInfos = + allocateBlock(keyInfo, excludeList, scmBlockSize); + keyInfo.appendNewBlocks(locationInfos); keyInfo.updateModifcationTime(); metadataManager.getOpenKeyTable().put(openKey, keyInfo); - return info; + return locationInfos.get(0); + } + + private List allocateBlock(OmKeyInfo keyInfo, + ExcludeList excludeList, long requestedSize) throws IOException { + int numBlocks = (int) ((requestedSize - 1) / scmBlockSize + 1); + List locationInfos = new ArrayList<>(numBlocks); + while (requestedSize > 0) { + long allocateSize = Math.min(requestedSize, scmBlockSize); + AllocatedBlock allocatedBlock; + try { + allocatedBlock = scmBlockClient + .allocateBlock(allocateSize, keyInfo.getType(), keyInfo.getFactor(), + omId, excludeList); + } catch (SCMException ex) { + if (ex.getResult() + .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) { + throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE); + } + throw ex; + } + OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() + .setBlockID(new BlockID(allocatedBlock.getBlockID())) + .setLength(scmBlockSize) + .setOffset(0); + if (grpcBlockTokenEnabled) { + String remoteUser = getRemoteUser().getShortUserName(); + builder.setToken(secretManager + .generateToken(remoteUser, allocatedBlock.getBlockID().toString(), + getAclForUser(remoteUser), scmBlockSize)); + } + locationInfos.add(builder.build()); + requestedSize -= allocateSize; + } + return locationInfos; } /* Optimize ugi lookup for RPC operations to avoid a trip through @@ -327,6 +339,10 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { ReplicationFactor factor = args.getFactor(); ReplicationType type = args.getType(); long currentTime = Time.monotonicNowNanos(); + long requestedSize = Math.min(preallocateMax, args.getDataSize()); + OmKeyInfo keyInfo; + String openKey; + long openVersion; FileEncryptionInfo encInfo = null; OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName); @@ -378,55 +394,14 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE; } } - long requestedSize = Math.min(preallocateMax, args.getDataSize()); List locations = new ArrayList<>(); String objectKey = metadataManager.getOzoneKey( volumeName, bucketName, keyName); - // requested size is not required but more like a optimization: - // SCM looks at the requested, if it 0, no block will be allocated at - // the point, if client needs more blocks, client can always call - // allocateBlock. But if requested size is not 0, OM will preallocate - // some blocks and piggyback to client, to save RPC calls. - while (requestedSize > 0) { - long allocateSize = Math.min(scmBlockSize, requestedSize); - AllocatedBlock allocatedBlock; - try { - allocatedBlock = scmBlockClient - .allocateBlock(allocateSize, type, factor, omId, - new ExcludeList()); - } catch (IOException ex) { - if (ex instanceof SCMException) { - if (((SCMException) ex).getResult() - .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) { - throw new OMException(ex.getMessage(), - ResultCodes.SCM_IN_CHILL_MODE); - } - } - throw ex; - } - OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() - .setBlockID(new BlockID(allocatedBlock.getBlockID())) - .setLength(allocateSize) - .setOffset(0); - if (grpcBlockTokenEnabled) { - String remoteUser = getRemoteUser().getShortUserName(); - builder.setToken(secretManager.generateToken(remoteUser, - allocatedBlock.getBlockID().toString(), - getAclForUser(remoteUser), - scmBlockSize)); - } - - OmKeyLocationInfo subKeyInfo = builder.build(); - locations.add(subKeyInfo); - requestedSize -= allocateSize; - } // NOTE size of a key is not a hard limit on anything, it is a value that // client should expect, in terms of current size of key. If client sets a // value, then this value is used, otherwise, we allocate a single block // which is the current size, if read by the client. long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize; - OmKeyInfo keyInfo; - long openVersion; if (args.getIsMultipartKey()) { // For this upload part we don't need to check in KeyTable. As this @@ -449,7 +424,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { openVersion = 0; } } - String openKey = metadataManager.getOpenKey( + openKey = metadataManager.getOpenKey( volumeName, bucketName, keyName, currentTime); if (metadataManager.getOpenKeyTable().get(openKey) != null) { // This should not happen. If this condition is satisfied, it means @@ -464,10 +439,8 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { throw new OMException("Cannot allocate key. Not able to get a valid" + "open key id.", ResultCodes.KEY_ALLOCATION_ERROR); } - metadataManager.getOpenKeyTable().put(openKey, keyInfo); LOG.debug("Key {} allocated in volume {} bucket {}", keyName, volumeName, bucketName); - return new OpenKeySession(currentTime, keyInfo, openVersion); } catch (OMException e) { throw e; } catch (IOException ex) { @@ -478,6 +451,19 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } finally { metadataManager.getLock().releaseBucketLock(volumeName, bucketName); } + + // requested size is not required but more like a optimization: + // SCM looks at the requested, if it 0, no block will be allocated at + // the point, if client needs more blocks, client can always call + // allocateBlock. But if requested size is not 0, OM will preallocate + // some blocks and piggyback to client, to save RPC calls. + if (requestedSize > 0) { + List locationInfos = + allocateBlock(keyInfo, new ExcludeList(), requestedSize); + keyInfo.appendNewBlocks(locationInfos); + } + metadataManager.getOpenKeyTable().put(openKey, keyInfo); + return new OpenKeySession(currentTime, keyInfo, openVersion); } /** diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index 992ccafba2..a76d052b41 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -18,177 +18,141 @@ package org.apache.hadoop.ozone.om; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec; -import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec; -import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec; +import org.apache.hadoop.hdds.scm.server.SCMConfigurator; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; +import org.apache.hadoop.ozone.om.helpers.*; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.db.CodecRegistry; -import org.apache.hadoop.utils.db.RDBStore; -import org.apache.hadoop.utils.db.Table; -import org.apache.hadoop.utils.db.TableConfig; -import org.junit.Before; -import org.junit.Rule; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.AfterClass; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; -import org.rocksdb.RocksDB; -import org.rocksdb.Statistics; -import org.rocksdb.StatsLevel; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.*; /** * Test class for @{@link KeyManagerImpl}. - * */ + */ public class TestKeyManagerImpl { private static KeyManagerImpl keyManager; - private static ScmBlockLocationProtocol scmBlockLocationProtocol; + private static VolumeManagerImpl volumeManager; + private static BucketManagerImpl bucketManager; + private static StorageContainerManager scm; + private static ScmBlockLocationProtocol mockScmBlockLocationProtocol; private static OzoneConfiguration conf; private static OMMetadataManager metadataManager; - private static long blockSize = 1000; + private static File dir; + private static long scmBlockSize; private static final String KEY_NAME = "key1"; private static final String BUCKET_NAME = "bucket1"; private static final String VOLUME_NAME = "vol1"; - private static RDBStore rdbStore = null; - private static Table keyTable = null; - private static Table bucketTable = null; - private static Table volumeTable = null; - private static DBOptions options = null; - private KeyInfo keyData; - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - @Before - public void setUp() throws Exception { + @BeforeClass + public static void setUp() throws Exception { conf = new OzoneConfiguration(); - scmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class); - metadataManager = Mockito.mock(OMMetadataManager.class); - keyManager = new KeyManagerImpl(scmBlockLocationProtocol, metadataManager, - conf, "om1", null); - setupMocks(); - } + dir = GenericTestUtils.getRandomizedTestDir(); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString()); + mockScmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class); + metadataManager = new OmMetadataManagerImpl(conf); + volumeManager = new VolumeManagerImpl(metadataManager, conf); + bucketManager = new BucketManagerImpl(metadataManager); + NodeManager nodeManager = new MockNodeManager(true, 10); + SCMConfigurator configurator = new SCMConfigurator(); + configurator.setScmNodeManager(nodeManager); + scm = TestUtils.getScm(conf, configurator); + scm.start(); + scm.exitChillMode(); + scmBlockSize = (long) conf + .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, + StorageUnit.BYTES); + conf.setLong(OZONE_KEY_PREALLOCATION_MAXSIZE, scmBlockSize * 10); - private void setupMocks() throws Exception { - Mockito.when(scmBlockLocationProtocol + keyManager = + new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf, + "om1", null); + Mockito.when(mockScmBlockLocationProtocol .allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class), Mockito.any(ReplicationFactor.class), Mockito.anyString(), - Mockito.any(ExcludeList.class))) - .thenThrow( - new SCMException("ChillModePrecheck failed for allocateBlock", - ResultCodes.CHILL_MODE_EXCEPTION)); - setupRocksDb(); - Mockito.when(metadataManager.getVolumeTable()).thenReturn(volumeTable); - Mockito.when(metadataManager.getBucketTable()).thenReturn(bucketTable); - Mockito.when(metadataManager.getOpenKeyTable()).thenReturn(keyTable); - Mockito.when(metadataManager.getLock()) - .thenReturn(new OzoneManagerLock(conf)); - Mockito.when(metadataManager.getVolumeKey(VOLUME_NAME)) - .thenReturn(VOLUME_NAME); - Mockito.when(metadataManager.getBucketKey(VOLUME_NAME, BUCKET_NAME)) - .thenReturn(BUCKET_NAME); - Mockito.when(metadataManager.getOpenKey(VOLUME_NAME, BUCKET_NAME, - KEY_NAME, 1)).thenReturn(KEY_NAME); + Mockito.any(ExcludeList.class))).thenThrow( + new SCMException("ChillModePrecheck failed for allocateBlock", + ResultCodes.CHILL_MODE_EXCEPTION)); + createVolume(VOLUME_NAME); + createBucket(VOLUME_NAME, BUCKET_NAME); } - private void setupRocksDb() throws Exception { - options = new DBOptions(); - options.setCreateIfMissing(true); - options.setCreateMissingColumnFamilies(true); + @AfterClass + public static void cleanup() throws Exception { + scm.stop(); + scm.join(); + metadataManager.stop(); + keyManager.stop(); + FileUtils.deleteDirectory(dir); + } - Statistics statistics = new Statistics(); - statistics.setStatsLevel(StatsLevel.ALL); - options = options.setStatistics(statistics); - - Set configSet = new HashSet<>(); - for (String name : Arrays - .asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY), - "testKeyTable", "testBucketTable", "testVolumeTable")) { - TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions()); - configSet.add(newConfig); - } - keyData = KeyInfo.newBuilder() - .setKeyName(KEY_NAME) - .setBucketName(BUCKET_NAME) - .setVolumeName(VOLUME_NAME) - .setDataSize(blockSize) - .setType(ReplicationType.STAND_ALONE) - .setFactor(ReplicationFactor.ONE) - .setCreationTime(Time.now()) - .setModificationTime(Time.now()) + private static void createBucket(String volumeName, String bucketName) + throws IOException { + OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) .build(); + bucketManager.createBucket(bucketInfo); + } - CodecRegistry registry = new CodecRegistry(); - registry.addCodec(OmKeyInfo.class, new OmKeyInfoCodec()); - registry.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec()); - registry.addCodec(OmBucketInfo.class, new OmBucketInfoCodec()); - rdbStore = new RDBStore(folder.newFolder(), options, configSet, registry); - - keyTable = - rdbStore.getTable("testKeyTable", String.class, OmKeyInfo.class); - - bucketTable = - rdbStore.getTable("testBucketTable", String.class, OmBucketInfo.class); - - volumeTable = - rdbStore.getTable("testVolumeTable", String.class, OmVolumeArgs.class); - - volumeTable.put(VOLUME_NAME, OmVolumeArgs.newBuilder() - .setAdminName("a") - .setOwnerName("o") - .setVolume(VOLUME_NAME) - .build()); - - bucketTable.put(BUCKET_NAME, - new OmBucketInfo.Builder().setBucketName(BUCKET_NAME) - .setVolumeName(VOLUME_NAME).build()); - - keyTable.put(KEY_NAME, new OmKeyInfo.Builder() - .setVolumeName(VOLUME_NAME) - .setBucketName(BUCKET_NAME) - .setKeyName(KEY_NAME) - .setReplicationType(ReplicationType.STAND_ALONE) - .setReplicationFactor(ReplicationFactor.THREE) - .build()); - + private static void createVolume(String volumeName) throws IOException { + OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder() + .setVolume(volumeName) + .setAdminName("bilbo") + .setOwnerName("bilbo") + .build(); + volumeManager.createVolume(volumeArgs); } @Test public void allocateBlockFailureInChillMode() throws Exception { - OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME) + KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol, + metadataManager, conf, "om1", null); + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setKeyName(KEY_NAME) .setBucketName(BUCKET_NAME) .setFactor(ReplicationFactor.ONE) .setType(ReplicationType.STAND_ALONE) - .setVolumeName(VOLUME_NAME).build(); + .setVolumeName(VOLUME_NAME) + .build(); + OpenKeySession keySession = keyManager1.openKey(keyArgs); LambdaTestUtils.intercept(OMException.class, "ChillModePrecheck failed for allocateBlock", () -> { - keyManager.allocateBlock(keyArgs, 1, new ExcludeList()); + keyManager1 + .allocateBlock(keyArgs, keySession.getId(), new ExcludeList()); }); } @Test public void openKeyFailureInChillMode() throws Exception { - OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME) + KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol, + metadataManager, conf, "om1", null); + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setKeyName(KEY_NAME) .setBucketName(BUCKET_NAME) .setFactor(ReplicationFactor.ONE) .setDataSize(1000) @@ -196,7 +160,23 @@ public void openKeyFailureInChillMode() throws Exception { .setVolumeName(VOLUME_NAME).build(); LambdaTestUtils.intercept(OMException.class, "ChillModePrecheck failed for allocateBlock", () -> { - keyManager.openKey(keyArgs); + keyManager1.openKey(keyArgs); }); } + + @Test + public void openKeyWithMultipleBlocks() throws IOException { + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setKeyName(UUID.randomUUID().toString()) + .setBucketName(BUCKET_NAME) + .setFactor(ReplicationFactor.ONE) + .setDataSize(scmBlockSize * 10) + .setType(ReplicationType.STAND_ALONE) + .setVolumeName(VOLUME_NAME) + .build(); + OpenKeySession keySession = keyManager.openKey(keyArgs); + OmKeyInfo keyInfo = keySession.getKeyInfo(); + Assert.assertEquals(10, + keyInfo.getLatestVersionLocations().getLocationList().size()); + } } \ No newline at end of file