HDFS-11668. Ozone: misc improvements for SCM CLI. Contributed by Weiwei Yang.
This commit is contained in:
parent
5984d40b86
commit
1a245accb5
@ -23,6 +23,8 @@
|
|||||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
@ -42,6 +44,8 @@
|
|||||||
* Current Context of State Machine.
|
* Current Context of State Machine.
|
||||||
*/
|
*/
|
||||||
public class StateContext {
|
public class StateContext {
|
||||||
|
static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(StateContext.class);
|
||||||
private final Queue<SCMCommand> commandQueue;
|
private final Queue<SCMCommand> commandQueue;
|
||||||
private final Lock lock;
|
private final Lock lock;
|
||||||
private final DatanodeStateMachine parent;
|
private final DatanodeStateMachine parent;
|
||||||
@ -187,6 +191,10 @@ public void execute(ExecutorService service, long time, TimeUnit unit)
|
|||||||
task.execute(service);
|
task.execute(service);
|
||||||
DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
|
DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
|
||||||
if (this.state != newState) {
|
if (this.state != newState) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Task {} executed, state transited from {} to {}",
|
||||||
|
task.getClass().getSimpleName(), this.state, newState);
|
||||||
|
}
|
||||||
if (isExiting(newState)) {
|
if (isExiting(newState)) {
|
||||||
task.onExit();
|
task.onExit();
|
||||||
}
|
}
|
||||||
|
@ -105,6 +105,12 @@ public DatanodeStateMachine.DatanodeStates call() throws Exception {
|
|||||||
*/
|
*/
|
||||||
private void persistContainerDatanodeID() throws IOException {
|
private void persistContainerDatanodeID() throws IOException {
|
||||||
String dataNodeIDPath = conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID);
|
String dataNodeIDPath = conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID);
|
||||||
|
if (Strings.isNullOrEmpty(dataNodeIDPath)) {
|
||||||
|
LOG.error("A valid file path is needed for config setting {}",
|
||||||
|
ScmConfigKeys.OZONE_SCM_DATANODE_ID);
|
||||||
|
this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
|
||||||
|
return;
|
||||||
|
}
|
||||||
File idPath = new File(dataNodeIDPath);
|
File idPath = new File(dataNodeIDPath);
|
||||||
int containerPort = this.context.getContainerPort();
|
int containerPort = this.context.getContainerPort();
|
||||||
DatanodeID datanodeID = this.context.getParent().getDatanodeID();
|
DatanodeID datanodeID = this.context.getParent().getDatanodeID();
|
||||||
|
@ -56,7 +56,7 @@ public void execute(CommandLine cmd) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
String pipelineID = cmd.getOptionValue(PIPELINE_ID);
|
String pipelineID = cmd.getOptionValue(PIPELINE_ID);
|
||||||
LOG.info("Create container :" + pipelineID + " " + getScmClient());
|
LOG.info("Create container : {}", pipelineID);
|
||||||
getScmClient().createContainer(pipelineID);
|
getScmClient().createContainer(pipelineID);
|
||||||
LOG.debug("Container creation returned");
|
LOG.debug("Container creation returned");
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.container.common;
|
package org.apache.hadoop.ozone.container.common;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
@ -46,6 +47,7 @@
|
|||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -294,21 +296,40 @@ public void testDatanodeStateContext() throws IOException,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test state transition with a list of invalid SCM names,
|
* Test state transition with a list of invalid scm configurations,
|
||||||
* and verify the state transits to SHUTDOWN each time.
|
* and verify the state transits to SHUTDOWN each time.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDatanodeStateMachineWithInvalidSCMNames()
|
public void testDatanodeStateMachineWithInvalidConfiguration()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
for (String name : new String[] {
|
|
||||||
"", // Empty
|
LinkedList<Map.Entry<String, String>> confList =
|
||||||
"x..y", // Invalid schema
|
new LinkedList<Map.Entry<String, String>>();
|
||||||
"scm:xyz", // Invalid port
|
confList.add(Maps.immutableEntry(ScmConfigKeys.OZONE_SCM_NAMES, ""));
|
||||||
"scm:123456" // Port out of range
|
|
||||||
}) {
|
// Invalid ozone.scm.names
|
||||||
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, name);
|
/** Empty **/
|
||||||
|
confList.add(Maps.immutableEntry(
|
||||||
|
ScmConfigKeys.OZONE_SCM_NAMES, ""));
|
||||||
|
/** Invalid schema **/
|
||||||
|
confList.add(Maps.immutableEntry(
|
||||||
|
ScmConfigKeys.OZONE_SCM_NAMES, "x..y"));
|
||||||
|
/** Invalid port **/
|
||||||
|
confList.add(Maps.immutableEntry(
|
||||||
|
ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz"));
|
||||||
|
/** Port out of range **/
|
||||||
|
confList.add(Maps.immutableEntry(
|
||||||
|
ScmConfigKeys.OZONE_SCM_NAMES, "scm:123456"));
|
||||||
|
// Invalid ozone.scm.datanode.id
|
||||||
|
/** Empty **/
|
||||||
|
confList.add(Maps.immutableEntry(
|
||||||
|
ScmConfigKeys.OZONE_SCM_DATANODE_ID, ""));
|
||||||
|
|
||||||
|
confList.forEach((entry) -> {
|
||||||
|
Configuration perTestConf = new Configuration(conf);
|
||||||
|
perTestConf.setStrings(entry.getKey(), entry.getValue());
|
||||||
try (DatanodeStateMachine stateMachine =
|
try (DatanodeStateMachine stateMachine =
|
||||||
new DatanodeStateMachine(conf)) {
|
new DatanodeStateMachine(perTestConf)) {
|
||||||
DatanodeStateMachine.DatanodeStates currentState =
|
DatanodeStateMachine.DatanodeStates currentState =
|
||||||
stateMachine.getContext().getState();
|
stateMachine.getContext().getState();
|
||||||
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
|
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
|
||||||
@ -320,7 +341,9 @@ public void testDatanodeStateMachineWithInvalidSCMNames()
|
|||||||
task.await(2, TimeUnit.SECONDS);
|
task.await(2, TimeUnit.SECONDS);
|
||||||
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN,
|
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN,
|
||||||
newState);
|
newState);
|
||||||
}
|
} catch (Exception e) {
|
||||||
}
|
Assert.fail("Unexpected exception found");
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user