HDDS-370. Add and implement following functions in SCMClientProtocolServer. Contributed by Ajay Kumar.
This commit is contained in:
parent
a4abf02028
commit
3928af983e
@ -133,4 +133,20 @@ Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
|
||||
* @throws IOException
|
||||
*/
|
||||
ScmInfo getScmInfo() throws IOException;
|
||||
|
||||
/**
|
||||
* Check if SCM is in chill mode.
|
||||
*
|
||||
* @return Returns true if SCM is in chill mode else returns false.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean inChillMode() throws IOException;
|
||||
|
||||
/**
|
||||
* Force SCM out of Chill mode.
|
||||
*
|
||||
* @return returns true if operation is successful.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean forceExitChillMode() throws IOException;
|
||||
}
|
||||
|
@ -20,8 +20,12 @@
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InChillModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InChillModeResponseProto;
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
@ -317,6 +321,44 @@ public ScmInfo getScmInfo() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if SCM is in chill mode.
|
||||
*
|
||||
* @return Returns true if SCM is in chill mode else returns false.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public boolean inChillMode() throws IOException {
|
||||
InChillModeRequestProto request =
|
||||
InChillModeRequestProto.getDefaultInstance();
|
||||
try {
|
||||
InChillModeResponseProto resp = rpcProxy.inChillMode(
|
||||
NULL_RPC_CONTROLLER, request);
|
||||
return resp.getInChillMode();
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Force SCM out of Chill mode.
|
||||
*
|
||||
* @return returns true if operation is successful.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public boolean forceExitChillMode() throws IOException {
|
||||
ForceExitChillModeRequestProto request =
|
||||
ForceExitChillModeRequestProto.getDefaultInstance();
|
||||
try {
|
||||
ForceExitChillModeResponseProto resp = rpcProxy
|
||||
.forceExitChillMode(NULL_RPC_CONTROLLER, request);
|
||||
return resp.getExitedChillMode();
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getUnderlyingProxyObject() {
|
||||
return rpcProxy;
|
||||
|
@ -21,6 +21,14 @@
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.InChillModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.InChillModeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ForceExitChillModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ForceExitChillModeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
@ -218,4 +226,28 @@ public HddsProtos.GetScmInfoRespsonseProto getScmInfo(
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public InChillModeResponseProto inChillMode(
|
||||
RpcController controller,
|
||||
InChillModeRequestProto request) throws ServiceException {
|
||||
try {
|
||||
return InChillModeResponseProto.newBuilder()
|
||||
.setInChillMode(impl.inChillMode()).build();
|
||||
} catch (IOException ex) {
|
||||
throw new ServiceException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ForceExitChillModeResponseProto forceExitChillMode(
|
||||
RpcController controller, ForceExitChillModeRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return ForceExitChillModeResponseProto.newBuilder()
|
||||
.setExitedChillMode(impl.forceExitChillMode()).build();
|
||||
} catch (IOException ex) {
|
||||
throw new ServiceException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -150,6 +150,21 @@ message PipelineResponseProto {
|
||||
optional string errorMessage = 3;
|
||||
}
|
||||
|
||||
|
||||
message InChillModeRequestProto {
|
||||
}
|
||||
|
||||
message InChillModeResponseProto {
|
||||
required bool inChillMode = 1;
|
||||
}
|
||||
|
||||
message ForceExitChillModeRequestProto {
|
||||
}
|
||||
|
||||
message ForceExitChillModeResponseProto {
|
||||
required bool exitedChillMode = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol used from an HDFS node to StorageContainerManager. See the request
|
||||
* and response messages for details of the RPC calls.
|
||||
@ -209,4 +224,16 @@ service StorageContainerLocationProtocolService {
|
||||
*/
|
||||
rpc getScmInfo(GetScmInfoRequestProto)
|
||||
returns (GetScmInfoRespsonseProto);
|
||||
|
||||
/**
|
||||
* Checks if SCM is in ChillMode.
|
||||
*/
|
||||
rpc inChillMode(InChillModeRequestProto)
|
||||
returns (InChillModeResponseProto);
|
||||
|
||||
/**
|
||||
* Returns information about SCM.
|
||||
*/
|
||||
rpc forceExitChillMode(ForceExitChillModeRequestProto)
|
||||
returns (ForceExitChillModeResponseProto);
|
||||
}
|
||||
|
@ -204,9 +204,11 @@ public void process(NodeRegistrationContainerReport reportsProto) {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
LOG.info("SCM in chill mode. {} % containers have at least one reported "
|
||||
+ "replica.", (containerWithMinReplicas.get() / maxContainer) * 100);
|
||||
if(inChillMode.get()) {
|
||||
LOG.info("SCM in chill mode. {} % containers have at least one"
|
||||
+ " reported replica.",
|
||||
(containerWithMinReplicas.get() / maxContainer) * 100);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -301,6 +301,28 @@ public ScmInfo getScmInfo() throws IOException {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if SCM is in chill mode.
|
||||
*
|
||||
* @return Returns true if SCM is in chill mode else returns false.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public boolean inChillMode() throws IOException {
|
||||
return scm.isInChillMode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Force SCM out of Chill mode.
|
||||
*
|
||||
* @return returns true if operation is successful.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public boolean forceExitChillMode() throws IOException {
|
||||
return scm.exitChillMode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries a list of Node that match a set of statuses.
|
||||
*
|
||||
|
@ -887,6 +887,14 @@ public EventPublisher getEventQueue(){
|
||||
return eventQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Force SCM out of chill mode.
|
||||
*/
|
||||
public boolean exitChillMode() {
|
||||
scmChillModeManager.exitChillMode(eventQueue);
|
||||
return true;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getCurrentContainerThreshold() {
|
||||
return scmChillModeManager.getCurrentContainerThreshold();
|
||||
|
@ -18,13 +18,10 @@
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
@ -35,16 +32,11 @@
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
@ -52,13 +44,9 @@
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
|
||||
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMChillModeManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMStorage;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
@ -69,7 +57,6 @@
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -467,142 +454,4 @@ public void testScmInfo() throws Exception {
|
||||
Assert.assertEquals(clusterId, scmInfo.getClusterId());
|
||||
Assert.assertEquals(scmId, scmInfo.getScmId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSCMChillMode() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
|
||||
.setHbInterval(1000)
|
||||
.setNumDatanodes(3)
|
||||
.setStartDataNodes(false)
|
||||
.setHbProcessorInterval(500);
|
||||
MiniOzoneClusterImpl cluster = (MiniOzoneClusterImpl) builder.build();
|
||||
// Test1: Test chill mode when there are no containers in system.
|
||||
assertTrue(cluster.getStorageContainerManager().isInChillMode());
|
||||
cluster.startHddsDatanodes();
|
||||
cluster.waitForClusterToBeReady();
|
||||
assertFalse(cluster.getStorageContainerManager().isInChillMode());
|
||||
|
||||
// Test2: Test chill mode when containers are there in system.
|
||||
// Create {numKeys} random names keys.
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100*2, 4096);
|
||||
final List<ContainerInfo> containers = cluster.getStorageContainerManager()
|
||||
.getScmContainerManager().getStateManager().getAllContainers();
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return containers.size() > 10;
|
||||
}, 100, 1000);
|
||||
|
||||
// Removing some container to keep them open.
|
||||
containers.remove(0);
|
||||
containers.remove(1);
|
||||
containers.remove(2);
|
||||
containers.remove(3);
|
||||
|
||||
// Close remaining containers
|
||||
ContainerMapping mapping = (ContainerMapping) cluster
|
||||
.getStorageContainerManager().getScmContainerManager();
|
||||
containers.forEach(c -> {
|
||||
try {
|
||||
mapping.updateContainerState(c.getContainerID(),
|
||||
HddsProtos.LifeCycleEvent.FINALIZE);
|
||||
mapping.updateContainerState(c.getContainerID(),
|
||||
LifeCycleEvent.CLOSE);
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to change state of open containers.", e);
|
||||
}
|
||||
});
|
||||
cluster.stop();
|
||||
|
||||
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
|
||||
.captureLogs(SCMChillModeManager.getLogger());
|
||||
logCapturer.clearOutput();
|
||||
AtomicReference<MiniOzoneCluster> miniCluster = new AtomicReference<>();
|
||||
new Thread(() -> {
|
||||
try {
|
||||
miniCluster.set(builder.setStartDataNodes(false).build());
|
||||
} catch (IOException e) {
|
||||
fail("failed");
|
||||
}
|
||||
}).start();
|
||||
|
||||
StorageContainerManager scm;
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return miniCluster.get() != null;
|
||||
}, 100, 1000 * 3);
|
||||
|
||||
scm = miniCluster.get().getStorageContainerManager();
|
||||
assertTrue(scm.isInChillMode());
|
||||
assertFalse(logCapturer.getOutput().contains("SCM exiting chill mode."));
|
||||
assertTrue(scm.getCurrentContainerThreshold() == 0);
|
||||
AtomicDouble curThreshold = new AtomicDouble();
|
||||
AtomicDouble lastReportedThreshold = new AtomicDouble();
|
||||
for(HddsDatanodeService dn:miniCluster.get().getHddsDatanodes()){
|
||||
dn.start(null);
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
curThreshold.set(scm.getCurrentContainerThreshold());
|
||||
return curThreshold.get() > lastReportedThreshold.get();
|
||||
}, 100, 1000 * 5);
|
||||
lastReportedThreshold.set(curThreshold.get());
|
||||
}
|
||||
double chillModeCutoff = conf
|
||||
.getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
|
||||
assertTrue(scm.getCurrentContainerThreshold() >= chillModeCutoff);
|
||||
assertTrue(logCapturer.getOutput().contains("SCM exiting chill mode."));
|
||||
assertFalse(scm.isInChillMode());
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSCMChillModeRestrictedOp() throws Exception {
|
||||
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
|
||||
OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB);
|
||||
|
||||
MiniOzoneClusterImpl cluster = (MiniOzoneClusterImpl) MiniOzoneCluster
|
||||
.newBuilder(conf)
|
||||
.setHbInterval(1000)
|
||||
.setHbProcessorInterval(500)
|
||||
.setStartDataNodes(false)
|
||||
.build();
|
||||
|
||||
StorageContainerManager scm = cluster.getStorageContainerManager();
|
||||
assertTrue(scm.isInChillMode());
|
||||
|
||||
LambdaTestUtils.intercept(SCMException.class,
|
||||
"ChillModePrecheck failed for allocateContainer", () -> {
|
||||
scm.getClientProtocolServer()
|
||||
.allocateContainer(ReplicationType.STAND_ALONE,
|
||||
ReplicationFactor.ONE, "");
|
||||
});
|
||||
|
||||
cluster.startHddsDatanodes();
|
||||
cluster.waitForClusterToBeReady();
|
||||
assertFalse(scm.isInChillMode());
|
||||
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
helper.createKeys(10, 4096);
|
||||
SCMClientProtocolServer clientProtocolServer = cluster
|
||||
.getStorageContainerManager().getClientProtocolServer();
|
||||
assertFalse((scm.getClientProtocolServer()).getChillModeStatus());
|
||||
final List<ContainerInfo> containers = scm.getScmContainerManager()
|
||||
.getStateManager().getAllContainers();
|
||||
scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS, true);
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return clientProtocolServer.getChillModeStatus();
|
||||
}, 50, 1000 * 5);
|
||||
assertTrue(clientProtocolServer.getChillModeStatus());
|
||||
|
||||
LambdaTestUtils.intercept(SCMException.class,
|
||||
"Open container " + containers.get(0).getContainerID() + " "
|
||||
+ "doesn't have enough replicas to service this operation in Chill"
|
||||
+ " mode.", () -> clientProtocolServer
|
||||
.getContainerWithPipeline(containers.get(0).getContainerID()));
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,15 +17,30 @@
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMChillModeManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.HddsDatanodeService;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.MiniOzoneClusterImpl;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.TestStorageContainerManagerHelper;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
@ -41,7 +56,11 @@
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
@ -49,10 +68,14 @@
|
||||
*/
|
||||
public class TestScmChillMode {
|
||||
|
||||
private final static Logger LOG = LoggerFactory
|
||||
.getLogger(TestScmChillMode.class);
|
||||
private static MiniOzoneCluster cluster = null;
|
||||
private static MiniOzoneCluster.Builder builder = null;
|
||||
private static OzoneConfiguration conf;
|
||||
private static OzoneManager om;
|
||||
private static StorageContainerLocationProtocolClientSideTranslatorPB
|
||||
storageContainerLocationClient;
|
||||
|
||||
|
||||
@Rule
|
||||
@ -77,6 +100,8 @@ public void init() throws Exception {
|
||||
cluster.startHddsDatanodes();
|
||||
cluster.waitForClusterToBeReady();
|
||||
om = cluster.getOzoneManager();
|
||||
storageContainerLocationClient = cluster
|
||||
.getStorageContainerLocationClient();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -85,7 +110,11 @@ public void init() throws Exception {
|
||||
@After
|
||||
public void shutdown() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
try {
|
||||
cluster.shutdown();
|
||||
} catch (Exception e) {
|
||||
// do nothing.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -93,79 +122,244 @@ public void shutdown() {
|
||||
public void testChillModeOperations() throws Exception {
|
||||
final AtomicReference<MiniOzoneCluster> miniCluster =
|
||||
new AtomicReference<>();
|
||||
// Create {numKeys} random names keys.
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100, 4096);
|
||||
final List<ContainerInfo> containers = cluster
|
||||
.getStorageContainerManager()
|
||||
.getScmContainerManager().getStateManager().getAllContainers();
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return containers.size() > 10;
|
||||
}, 100, 1000);
|
||||
|
||||
try {
|
||||
// Create {numKeys} random names keys.
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100, 4096);
|
||||
final List<ContainerInfo> containers = cluster
|
||||
.getStorageContainerManager()
|
||||
.getScmContainerManager().getStateManager().getAllContainers();
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return containers.size() > 10;
|
||||
}, 100, 1000);
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
|
||||
String keyName = "key" + RandomStringUtils.randomNumeric(5);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
||||
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setDataSize(1000)
|
||||
.build();
|
||||
OmVolumeArgs volArgs = new OmVolumeArgs.Builder()
|
||||
.setAdminName(adminName)
|
||||
.setCreationTime(Time.monotonicNow())
|
||||
.setQuotaInBytes(10000)
|
||||
.setVolume(volumeName)
|
||||
.setOwnerName(userName)
|
||||
.build();
|
||||
OmBucketInfo bucketInfo = new OmBucketInfo.Builder()
|
||||
.setBucketName(bucketName)
|
||||
.setIsVersionEnabled(false)
|
||||
.setVolumeName(volumeName)
|
||||
.build();
|
||||
om.createVolume(volArgs);
|
||||
om.createBucket(bucketInfo);
|
||||
om.openKey(keyArgs);
|
||||
//om.commitKey(keyArgs, 1);
|
||||
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
|
||||
String keyName = "key" + RandomStringUtils.randomNumeric(5);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
||||
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setDataSize(1000)
|
||||
.build();
|
||||
OmVolumeArgs volArgs = new OmVolumeArgs.Builder()
|
||||
.setAdminName(adminName)
|
||||
.setCreationTime(Time.monotonicNow())
|
||||
.setQuotaInBytes(10000)
|
||||
.setVolume(volumeName)
|
||||
.setOwnerName(userName)
|
||||
.build();
|
||||
OmBucketInfo bucketInfo = new OmBucketInfo.Builder()
|
||||
.setBucketName(bucketName)
|
||||
.setIsVersionEnabled(false)
|
||||
.setVolumeName(volumeName)
|
||||
.build();
|
||||
om.createVolume(volArgs);
|
||||
om.createBucket(bucketInfo);
|
||||
om.openKey(keyArgs);
|
||||
//om.commitKey(keyArgs, 1);
|
||||
cluster.stop();
|
||||
|
||||
cluster.stop();
|
||||
|
||||
new Thread(() -> {
|
||||
try {
|
||||
miniCluster.set(builder.build());
|
||||
} catch (IOException e) {
|
||||
fail("failed");
|
||||
}
|
||||
}).start();
|
||||
|
||||
StorageContainerManager scm;
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return miniCluster.get() != null;
|
||||
}, 100, 1000 * 3);
|
||||
|
||||
scm = miniCluster.get().getStorageContainerManager();
|
||||
Assert.assertTrue(scm.isInChillMode());
|
||||
|
||||
om = miniCluster.get().getOzoneManager();
|
||||
|
||||
LambdaTestUtils.intercept(OMException.class,
|
||||
"ChillModePrecheck failed for allocateBlock",
|
||||
() -> om.openKey(keyArgs));
|
||||
|
||||
} finally {
|
||||
if (miniCluster.get() != null) {
|
||||
try {
|
||||
miniCluster.get().shutdown();
|
||||
} catch (Exception e) {
|
||||
// do nothing.
|
||||
}
|
||||
new Thread(() -> {
|
||||
try {
|
||||
miniCluster.set(builder.build());
|
||||
} catch (IOException e) {
|
||||
fail("failed");
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
|
||||
StorageContainerManager scm;
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return miniCluster.get() != null;
|
||||
}, 100, 1000 * 3);
|
||||
cluster = miniCluster.get();
|
||||
|
||||
scm = cluster.getStorageContainerManager();
|
||||
Assert.assertTrue(scm.isInChillMode());
|
||||
|
||||
om = miniCluster.get().getOzoneManager();
|
||||
|
||||
LambdaTestUtils.intercept(OMException.class,
|
||||
"ChillModePrecheck failed for allocateBlock",
|
||||
() -> om.openKey(keyArgs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests inChillMode & forceExitChillMode api calls.
|
||||
*/
|
||||
@Test
|
||||
public void testIsScmInChillModeAndForceExit() throws Exception {
|
||||
final AtomicReference<MiniOzoneCluster> miniCluster =
|
||||
new AtomicReference<>();
|
||||
// Test 1: SCM should be out of chill mode.
|
||||
Assert.assertFalse(storageContainerLocationClient.inChillMode());
|
||||
cluster.stop();
|
||||
// Restart the cluster with same metadata dir.
|
||||
new Thread(() -> {
|
||||
try {
|
||||
miniCluster.set(builder.build());
|
||||
} catch (IOException e) {
|
||||
Assert.fail("Cluster startup failed.");
|
||||
}
|
||||
}).start();
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return miniCluster.get() != null;
|
||||
}, 10, 1000 * 3);
|
||||
cluster = miniCluster.get();
|
||||
|
||||
// Test 2: Scm should be in chill mode as datanodes are not started yet.
|
||||
storageContainerLocationClient = cluster
|
||||
.getStorageContainerLocationClient();
|
||||
Assert.assertTrue(storageContainerLocationClient.inChillMode());
|
||||
// Force scm out of chill mode.
|
||||
cluster.getStorageContainerManager().getClientProtocolServer()
|
||||
.forceExitChillMode();
|
||||
// Test 3: SCM should be out of chill mode.
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
return !cluster.getStorageContainerManager().getClientProtocolServer()
|
||||
.inChillMode();
|
||||
} catch (IOException e) {
|
||||
Assert.fail("Cluster");
|
||||
return false;
|
||||
}
|
||||
}, 10, 1000 * 5);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSCMChillMode() throws Exception {
|
||||
MiniOzoneCluster.Builder clusterBuilder = MiniOzoneCluster.newBuilder(conf)
|
||||
.setHbInterval(1000)
|
||||
.setNumDatanodes(3)
|
||||
.setStartDataNodes(false)
|
||||
.setHbProcessorInterval(500);
|
||||
MiniOzoneClusterImpl miniCluster = (MiniOzoneClusterImpl) clusterBuilder
|
||||
.build();
|
||||
// Test1: Test chill mode when there are no containers in system.
|
||||
assertTrue(miniCluster.getStorageContainerManager().isInChillMode());
|
||||
miniCluster.startHddsDatanodes();
|
||||
miniCluster.waitForClusterToBeReady();
|
||||
assertFalse(miniCluster.getStorageContainerManager().isInChillMode());
|
||||
|
||||
// Test2: Test chill mode when containers are there in system.
|
||||
// Create {numKeys} random names keys.
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(miniCluster, conf);
|
||||
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100 * 2, 4096);
|
||||
final List<ContainerInfo> containers = miniCluster
|
||||
.getStorageContainerManager().getScmContainerManager()
|
||||
.getStateManager().getAllContainers();
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return containers.size() > 10;
|
||||
}, 100, 1000 * 2);
|
||||
|
||||
// Removing some container to keep them open.
|
||||
containers.remove(0);
|
||||
containers.remove(1);
|
||||
containers.remove(2);
|
||||
containers.remove(3);
|
||||
|
||||
// Close remaining containers
|
||||
ContainerMapping mapping = (ContainerMapping) miniCluster
|
||||
.getStorageContainerManager().getScmContainerManager();
|
||||
containers.forEach(c -> {
|
||||
try {
|
||||
mapping.updateContainerState(c.getContainerID(),
|
||||
HddsProtos.LifeCycleEvent.FINALIZE);
|
||||
mapping.updateContainerState(c.getContainerID(),
|
||||
LifeCycleEvent.CLOSE);
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to change state of open containers.", e);
|
||||
}
|
||||
});
|
||||
miniCluster.stop();
|
||||
|
||||
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
|
||||
.captureLogs(SCMChillModeManager.getLogger());
|
||||
logCapturer.clearOutput();
|
||||
AtomicReference<MiniOzoneCluster> miniClusterOzone
|
||||
= new AtomicReference<>();
|
||||
new Thread(() -> {
|
||||
try {
|
||||
miniClusterOzone.set(clusterBuilder.setStartDataNodes(false).build());
|
||||
} catch (IOException e) {
|
||||
fail("failed");
|
||||
}
|
||||
}).start();
|
||||
|
||||
StorageContainerManager scm;
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return miniClusterOzone.get() != null;
|
||||
}, 100, 1000 * 3);
|
||||
|
||||
miniCluster = (MiniOzoneClusterImpl) miniClusterOzone.get();
|
||||
|
||||
scm = miniCluster.getStorageContainerManager();
|
||||
assertTrue(scm.isInChillMode());
|
||||
assertFalse(logCapturer.getOutput().contains("SCM exiting chill mode."));
|
||||
assertTrue(scm.getCurrentContainerThreshold() == 0);
|
||||
AtomicDouble curThreshold = new AtomicDouble();
|
||||
AtomicDouble lastReportedThreshold = new AtomicDouble();
|
||||
for (HddsDatanodeService dn : miniCluster.getHddsDatanodes()) {
|
||||
dn.start(null);
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
curThreshold.set(scm.getCurrentContainerThreshold());
|
||||
return curThreshold.get() > lastReportedThreshold.get();
|
||||
}, 100, 1000 * 5);
|
||||
lastReportedThreshold.set(curThreshold.get());
|
||||
}
|
||||
cluster = miniCluster;
|
||||
double chillModeCutoff = conf
|
||||
.getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
|
||||
assertTrue(scm.getCurrentContainerThreshold() >= chillModeCutoff);
|
||||
assertTrue(logCapturer.getOutput().contains("SCM exiting chill mode."));
|
||||
assertFalse(scm.isInChillMode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSCMChillModeRestrictedOp() throws Exception {
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
|
||||
OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB);
|
||||
cluster.stop();
|
||||
cluster = builder.build();
|
||||
StorageContainerManager scm = cluster.getStorageContainerManager();
|
||||
assertTrue(scm.isInChillMode());
|
||||
|
||||
LambdaTestUtils.intercept(SCMException.class,
|
||||
"ChillModePrecheck failed for allocateContainer", () -> {
|
||||
scm.getClientProtocolServer()
|
||||
.allocateContainer(ReplicationType.STAND_ALONE,
|
||||
ReplicationFactor.ONE, "");
|
||||
});
|
||||
|
||||
cluster.startHddsDatanodes();
|
||||
cluster.waitForClusterToBeReady();
|
||||
assertFalse(scm.isInChillMode());
|
||||
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
helper.createKeys(10, 4096);
|
||||
SCMClientProtocolServer clientProtocolServer = cluster
|
||||
.getStorageContainerManager().getClientProtocolServer();
|
||||
assertFalse((scm.getClientProtocolServer()).getChillModeStatus());
|
||||
final List<ContainerInfo> containers = scm.getScmContainerManager()
|
||||
.getStateManager().getAllContainers();
|
||||
scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS, true);
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return clientProtocolServer.getChillModeStatus();
|
||||
}, 50, 1000 * 5);
|
||||
assertTrue(clientProtocolServer.getChillModeStatus());
|
||||
|
||||
LambdaTestUtils.intercept(SCMException.class,
|
||||
"Open container " + containers.get(0).getContainerID() + " "
|
||||
+ "doesn't have enough replicas to service this operation in Chill"
|
||||
+ " mode.", () -> clientProtocolServer
|
||||
.getContainerWithPipeline(containers.get(0).getContainerID()));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user