HDDS-488. Handle chill mode exception from SCM in OzoneManager. Contributed by Ajay Kumar.
This commit is contained in:
parent
17f5651a51
commit
3929653707
@ -167,7 +167,7 @@ public static class Builder {
|
||||
private StorageType storageType;
|
||||
private long creationTime;
|
||||
|
||||
Builder() {
|
||||
public Builder() {
|
||||
//Default values
|
||||
this.acls = new LinkedList<>();
|
||||
this.isVersionEnabled = false;
|
||||
|
@ -151,7 +151,7 @@ public static class Builder {
|
||||
/**
|
||||
* Constructs a builder.
|
||||
*/
|
||||
Builder() {
|
||||
public Builder() {
|
||||
keyValueMap = new HashMap<>();
|
||||
aclMap = new OmOzoneAclMap();
|
||||
}
|
||||
|
@ -0,0 +1,171 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.TestStorageContainerManagerHelper;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Test Ozone Manager operation in distributed handler scenario.
|
||||
*/
|
||||
public class TestScmChillMode {
|
||||
|
||||
private static MiniOzoneCluster cluster = null;
|
||||
private static MiniOzoneCluster.Builder builder = null;
|
||||
private static OzoneConfiguration conf;
|
||||
private static OzoneManager om;
|
||||
|
||||
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(1000 * 200);
|
||||
|
||||
/**
|
||||
* Create a MiniDFSCluster for testing.
|
||||
* <p>
|
||||
* Ozone is made active by setting OZONE_ENABLED = true and
|
||||
* OZONE_HANDLER_TYPE_KEY = "distributed"
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
builder = MiniOzoneCluster.newBuilder(conf)
|
||||
.setHbInterval(1000)
|
||||
.setHbProcessorInterval(500)
|
||||
.setStartDataNodes(false);
|
||||
cluster = builder.build();
|
||||
cluster.startHddsDatanodes();
|
||||
cluster.waitForClusterToBeReady();
|
||||
om = cluster.getOzoneManager();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown MiniDFSCluster.
|
||||
*/
|
||||
@After
|
||||
public void shutdown() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChillModeOperations() throws Exception {
|
||||
final AtomicReference<MiniOzoneCluster> miniCluster =
|
||||
new AtomicReference<>();
|
||||
|
||||
try {
|
||||
// Create {numKeys} random names keys.
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100, 4096);
|
||||
final List<ContainerInfo> containers = cluster
|
||||
.getStorageContainerManager()
|
||||
.getScmContainerManager().getStateManager().getAllContainers();
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return containers.size() > 10;
|
||||
}, 100, 1000);
|
||||
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
|
||||
String keyName = "key" + RandomStringUtils.randomNumeric(5);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
||||
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setDataSize(1000)
|
||||
.build();
|
||||
OmVolumeArgs volArgs = new OmVolumeArgs.Builder()
|
||||
.setAdminName(adminName)
|
||||
.setCreationTime(Time.monotonicNow())
|
||||
.setQuotaInBytes(10000)
|
||||
.setVolume(volumeName)
|
||||
.setOwnerName(userName)
|
||||
.build();
|
||||
OmBucketInfo bucketInfo = new OmBucketInfo.Builder()
|
||||
.setBucketName(bucketName)
|
||||
.setIsVersionEnabled(false)
|
||||
.setVolumeName(volumeName)
|
||||
.build();
|
||||
om.createVolume(volArgs);
|
||||
om.createBucket(bucketInfo);
|
||||
om.openKey(keyArgs);
|
||||
//om.commitKey(keyArgs, 1);
|
||||
|
||||
cluster.stop();
|
||||
|
||||
new Thread(() -> {
|
||||
try {
|
||||
miniCluster.set(builder.build());
|
||||
} catch (IOException e) {
|
||||
fail("failed");
|
||||
}
|
||||
}).start();
|
||||
|
||||
StorageContainerManager scm;
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return miniCluster.get() != null;
|
||||
}, 100, 1000 * 3);
|
||||
|
||||
scm = miniCluster.get().getStorageContainerManager();
|
||||
Assert.assertTrue(scm.isInChillMode());
|
||||
|
||||
om = miniCluster.get().getOzoneManager();
|
||||
|
||||
LambdaTestUtils.intercept(OMException.class,
|
||||
"ChillModePrecheck failed for allocateBlock",
|
||||
() -> om.openKey(keyArgs));
|
||||
|
||||
} finally {
|
||||
if (miniCluster.get() != null) {
|
||||
try {
|
||||
miniCluster.get().shutdown();
|
||||
} catch (Exception e) {
|
||||
// do nothing.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
@ -155,9 +156,18 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
|
||||
}
|
||||
OmKeyInfo keyInfo =
|
||||
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
|
||||
AllocatedBlock allocatedBlock =
|
||||
scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
|
||||
keyInfo.getFactor(), omId);
|
||||
AllocatedBlock allocatedBlock;
|
||||
try {
|
||||
allocatedBlock =
|
||||
scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
|
||||
keyInfo.getFactor(), omId);
|
||||
} catch (SCMException ex) {
|
||||
if (ex.getResult()
|
||||
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
|
||||
throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
|
||||
.setBlockID(allocatedBlock.getBlockID())
|
||||
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
|
||||
@ -208,8 +218,20 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
|
||||
// some blocks and piggyback to client, to save RPC calls.
|
||||
while (requestedSize > 0) {
|
||||
long allocateSize = Math.min(scmBlockSize, requestedSize);
|
||||
AllocatedBlock allocatedBlock =
|
||||
scmBlockClient.allocateBlock(allocateSize, type, factor, omId);
|
||||
AllocatedBlock allocatedBlock;
|
||||
try {
|
||||
allocatedBlock = scmBlockClient
|
||||
.allocateBlock(allocateSize, type, factor, omId);
|
||||
} catch (IOException ex) {
|
||||
if (ex instanceof SCMException) {
|
||||
if (((SCMException) ex).getResult()
|
||||
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
|
||||
throw new OMException(ex.getMessage(),
|
||||
ResultCodes.SCM_IN_CHILL_MODE);
|
||||
}
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder()
|
||||
.setBlockID(allocatedBlock.getBlockID())
|
||||
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
|
||||
|
@ -113,6 +113,7 @@ public enum ResultCodes {
|
||||
FAILED_METADATA_ERROR,
|
||||
FAILED_INTERNAL_ERROR,
|
||||
OM_NOT_INITIALIZED,
|
||||
SCM_VERSION_MISMATCH_ERROR
|
||||
SCM_VERSION_MISMATCH_ERROR,
|
||||
SCM_IN_CHILL_MODE
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,165 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.db.RDBStore;
|
||||
import org.apache.hadoop.utils.db.Table;
|
||||
import org.apache.hadoop.utils.db.TableConfig;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.mockito.Mockito;
|
||||
import org.rocksdb.ColumnFamilyOptions;
|
||||
import org.rocksdb.DBOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.Statistics;
|
||||
import org.rocksdb.StatsLevel;
|
||||
|
||||
/**
|
||||
* Test class for @{@link KeyManagerImpl}.
|
||||
* */
|
||||
public class TestKeyManagerImpl {
|
||||
|
||||
private static KeyManagerImpl keyManager;
|
||||
private static ScmBlockLocationProtocol scmBlockLocationProtocol;
|
||||
private static OzoneConfiguration conf;
|
||||
private static OMMetadataManager metadataManager;
|
||||
private static long blockSize = 1000;
|
||||
private static final String KEY_NAME = "key1";
|
||||
private static final String BUCKET_NAME = "bucket1";
|
||||
private static final String VOLUME_NAME = "vol1";
|
||||
private static RDBStore rdbStore = null;
|
||||
private static Table rdbTable = null;
|
||||
private static DBOptions options = null;
|
||||
private KeyInfo keyData;
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
scmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
|
||||
metadataManager = Mockito.mock(OMMetadataManager.class);
|
||||
keyManager = new KeyManagerImpl(scmBlockLocationProtocol, metadataManager,
|
||||
conf, "om1");
|
||||
setupMocks();
|
||||
}
|
||||
|
||||
private void setupMocks() throws Exception {
|
||||
Mockito.when(scmBlockLocationProtocol
|
||||
.allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class),
|
||||
Mockito.any(ReplicationFactor.class), Mockito.anyString()))
|
||||
.thenThrow(
|
||||
new SCMException("ChillModePrecheck failed for allocateBlock",
|
||||
ResultCodes.CHILL_MODE_EXCEPTION));
|
||||
setupRocksDb();
|
||||
Mockito.when(metadataManager.getVolumeTable()).thenReturn(rdbTable);
|
||||
Mockito.when(metadataManager.getBucketTable()).thenReturn(rdbTable);
|
||||
Mockito.when(metadataManager.getOpenKeyTable()).thenReturn(rdbTable);
|
||||
Mockito.when(metadataManager.getLock())
|
||||
.thenReturn(new OzoneManagerLock(conf));
|
||||
Mockito.when(metadataManager.getVolumeKey(VOLUME_NAME))
|
||||
.thenReturn(VOLUME_NAME.getBytes());
|
||||
Mockito.when(metadataManager.getBucketKey(VOLUME_NAME, BUCKET_NAME))
|
||||
.thenReturn(BUCKET_NAME.getBytes());
|
||||
Mockito.when(metadataManager.getOpenKeyBytes(VOLUME_NAME, BUCKET_NAME,
|
||||
KEY_NAME, 1)).thenReturn(KEY_NAME.getBytes());
|
||||
}
|
||||
|
||||
private void setupRocksDb() throws Exception {
|
||||
options = new DBOptions();
|
||||
options.setCreateIfMissing(true);
|
||||
options.setCreateMissingColumnFamilies(true);
|
||||
|
||||
Statistics statistics = new Statistics();
|
||||
statistics.setStatsLevel(StatsLevel.ALL);
|
||||
options = options.setStatistics(statistics);
|
||||
|
||||
Set<TableConfig> configSet = new HashSet<>();
|
||||
for (String name : Arrays
|
||||
.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
|
||||
"testTable")) {
|
||||
TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions());
|
||||
configSet.add(newConfig);
|
||||
}
|
||||
keyData = KeyInfo.newBuilder()
|
||||
.setKeyName(KEY_NAME)
|
||||
.setBucketName(BUCKET_NAME)
|
||||
.setVolumeName(VOLUME_NAME)
|
||||
.setDataSize(blockSize)
|
||||
.setType(ReplicationType.STAND_ALONE)
|
||||
.setFactor(ReplicationFactor.ONE)
|
||||
.setCreationTime(Time.now())
|
||||
.setModificationTime(Time.now())
|
||||
.build();
|
||||
|
||||
rdbStore = new RDBStore(folder.newFolder(), options, configSet);
|
||||
rdbTable = rdbStore.getTable("testTable");
|
||||
rdbTable.put(VOLUME_NAME.getBytes(),
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8));
|
||||
rdbTable.put(BUCKET_NAME.getBytes(),
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8));
|
||||
rdbTable.put(KEY_NAME.getBytes(), keyData.toByteArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void allocateBlockFailureInChillMode() throws Exception {
|
||||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME)
|
||||
.setBucketName(BUCKET_NAME)
|
||||
.setFactor(ReplicationFactor.ONE)
|
||||
.setType(ReplicationType.STAND_ALONE)
|
||||
.setVolumeName(VOLUME_NAME).build();
|
||||
LambdaTestUtils.intercept(OMException.class,
|
||||
"ChillModePrecheck failed for allocateBlock", () -> {
|
||||
keyManager.allocateBlock(keyArgs, 1);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void openKeyFailureInChillMode() throws Exception {
|
||||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME)
|
||||
.setBucketName(BUCKET_NAME)
|
||||
.setFactor(ReplicationFactor.ONE)
|
||||
.setDataSize(1000)
|
||||
.setType(ReplicationType.STAND_ALONE)
|
||||
.setVolumeName(VOLUME_NAME).build();
|
||||
LambdaTestUtils.intercept(OMException.class,
|
||||
"ChillModePrecheck failed for allocateBlock", () -> {
|
||||
keyManager.openKey(keyArgs);
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user