HDFS-11108. Ozone: use containers with the state machine. Contributed by Anu Engineer

This commit is contained in:
Anu Engineer 2016-11-17 13:01:09 -08:00 committed by Owen O'Malley
parent 8bd85268e6
commit 05b44e1ad8
5 changed files with 84 additions and 75 deletions

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
@ -43,27 +44,26 @@ public class DatanodeStateMachine implements Closeable {
private final ExecutorService executorService;
private final Configuration conf;
private final SCMConnectionManager connectionManager;
private final long taskWaitTime;
private final long heartbeatFrequency;
private StateContext context;
private final OzoneContainer container;
/**
* Constructs a container state machine.
* Constructs a a datanode state machine.
*
* @param conf - Configration.
*/
public DatanodeStateMachine(Configuration conf) {
public DatanodeStateMachine(Configuration conf) throws IOException {
this.conf = conf;
executorService = HadoopExecutors.newScheduledThreadPool(
this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Container State Machine Thread - %d").build());
.setNameFormat("Datanode State Machine Thread - %d").build());
connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
taskWaitTime = this.conf.getLong(OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT,
OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT_DEFAULT);
heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf);
container = new OzoneContainer(conf);
}
/**
@ -81,10 +81,12 @@ public SCMConnectionManager getConnectionManager() {
public void start() throws IOException {
long now = 0;
long nextHB = 0;
container.start();
while (context.getState() != DatanodeStates.SHUTDOWN) {
try {
nextHB = Time.monotonicNow() + heartbeatFrequency;
context.execute(executorService, taskWaitTime, TimeUnit.SECONDS);
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
now = Time.monotonicNow();
if (now < nextHB) {
Thread.sleep(nextHB - now);
@ -144,6 +146,10 @@ public void close() throws IOException {
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
endPoint.close();
}
if(container != null) {
container.stop();
}
}
/**
@ -159,7 +165,7 @@ public enum DatanodeStates {
/**
* Constructs ContainerStates.
*
* @param value
* @param value Enum Value
*/
DatanodeStates(int value) {
this.value = value;
@ -210,4 +216,22 @@ public DatanodeStates getNextState() {
return getLastState();
}
}
public static DatanodeStateMachine initStateMachine(Configuration conf)
throws IOException {
DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
Runnable startStateMachineTask = () -> {
try {
stateMachine.start();
} catch (Exception ex) {
LOG.error("Unable to start the DatanodeState Machine", ex);
}
};
Thread thread = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Datanode State Machine Thread - %d")
.build().newThread(startStateMachineTask);
thread.start();
return stateMachine;
}
}

View File

@ -30,6 +30,8 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import java.io.IOException;
/**
* Creates a netty server endpoint that acts as the communication layer for
* Ozone containers.
@ -58,9 +60,9 @@ public XceiverServer(Configuration conf,
/**
* Starts running the server.
*
* @throws Exception
* @throws IOException
*/
public void start() throws Exception {
public void start() throws IOException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
channel = new ServerBootstrap()

View File

@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -19,8 +19,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
@ -38,6 +36,8 @@
import java.util.LinkedList;
import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
/**
* Ozone main class sets up the network server and initializes the container
* layer.
@ -57,12 +57,11 @@ public class OzoneContainer {
* Creates a network endpoint and enables Ozone container.
*
* @param ozoneConfig - Config
* @param dataSet - FsDataset.
* @throws IOException
*/
public OzoneContainer(
Configuration ozoneConfig,
FsDatasetSpi<? extends FsVolumeSpi> dataSet) throws Exception {
Configuration ozoneConfig) throws IOException {
this.ozoneConfig = ozoneConfig;
List<StorageLocation> locations = new LinkedList<>();
String[] paths = ozoneConfig.getStrings(
OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS);
@ -71,11 +70,9 @@ public OzoneContainer(
locations.add(StorageLocation.parse(p));
}
} else {
getDataDir(dataSet, locations);
getDataDir(locations);
}
this.ozoneConfig = ozoneConfig;
manager = new ContainerManagerImpl();
manager.init(this.ozoneConfig, locations);
this.chunkManager = new ChunkManagerImpl(manager);
@ -90,43 +87,44 @@ public OzoneContainer(
/**
* Starts serving requests to ozone container.
* @throws Exception
*
* @throws IOException
*/
public void start() throws Exception {
public void start() throws IOException {
server.start();
}
/**
* Stops the ozone container.
*
* Shutdown logic is not very obvious from the following code.
* if you need to modify the logic, please keep these comments in mind.
* Here is the shutdown sequence.
*
* <p>
* Shutdown logic is not very obvious from the following code. if you need to
* modify the logic, please keep these comments in mind. Here is the shutdown
* sequence.
* <p>
* 1. We shutdown the network ports.
*
* <p>
* 2. Now we need to wait for all requests in-flight to finish.
*
* 3. The container manager lock is a read-write lock with "Fairness" enabled.
*
* <p>
* 3. The container manager lock is a read-write lock with "Fairness"
* enabled.
* <p>
* 4. This means that the waiting threads are served in a "first-come-first
* -served" manner. Please note that this applies to waiting threads only.
*
* <p>
* 5. Since write locks are exclusive, if we are waiting to get a lock it
* implies that we are waiting for in-flight operations to complete.
*
* <p>
* 6. if there are other write operations waiting on the reader-writer lock,
* fairness guarantees that they will proceed before the shutdown lock
* request.
*
* <p>
* 7. Since all operations either take a reader or writer lock of container
* manager, we are guaranteed that we are the last operation since we have
* closed the network port, and we wait until close is successful.
*
* <p>
* 8. We take the writer lock and call shutdown on each of the managers in
* reverse order. That is chunkManager, keyManager and containerManager is
* shutdown.
*
*/
public void stop() {
LOG.info("Attempting to stop container services.");
@ -144,26 +142,14 @@ public void stop() {
/**
* Returns a paths to data dirs.
* @param dataset - FSDataset.
*
* @param pathList - List of paths.
* @throws IOException
*/
private void getDataDir(
FsDatasetSpi<? extends FsVolumeSpi> dataset,
List<StorageLocation> pathList) throws IOException {
FsDatasetSpi.FsVolumeReferences references;
try {
synchronized (dataset) {
references = dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
pathList.add(StorageLocation.parse(vol.getBaseURI().getPath()));
}
references.close();
}
} catch (IOException ex) {
LOG.error("Unable to get volume paths.", ex);
throw new IOException("Internal error", ex);
private void getDataDir(List<StorageLocation> pathList) throws IOException {
for (String dir : ozoneConfig.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
StorageLocation location = StorageLocation.parse(dir);
pathList.add(location);
}
}
}

View File

@ -20,19 +20,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
@ -54,18 +47,20 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
/**
* Tests the datanode state machine class and its states.
*/
public class TestDatanodeStateMachine {
private static final Logger LOG =
LoggerFactory.getLogger(TestDatanodeStateMachine.class);
private final int scmServerCount = 3;
private List<String> serverAddresses;
private List<RPC.Server> scmServers;
private List<ScmTestMock> mockServers;
private ExecutorService executorService;
private Configuration conf;
private static final Logger LOG =
LoggerFactory.getLogger(TestDatanodeStateMachine.class);
@Before
public void setUp() throws Exception {
@ -91,14 +86,15 @@ public void setUp() throws Exception {
String path = p.getPath().concat(
TestDatanodeStateMachine.class.getSimpleName());
File f = new File(path);
if(!f.mkdirs()) {
if (!f.mkdirs()) {
LOG.info("Required directories already exist.");
}
conf.set(DFS_DATANODE_DATA_DIR_KEY, path);
path = Paths.get(path.toString(),
TestDatanodeStateMachine.class.getSimpleName() + ".id").toString();
conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ID, path);
executorService = HadoopExecutors.newScheduledThreadPool(
conf.getInt(
OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
@ -122,7 +118,6 @@ public void tearDown() throws Exception {
/**
* Assert that starting statemachine executes the Init State.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
@ -132,7 +127,7 @@ public void testDatanodeStateMachineStartThread() throws IOException,
Runnable startStateMachineTask = () -> {
try {
stateMachine.start();
} catch (IOException ex) {
} catch (Exception ex) {
}
};
Thread thread1 = new Thread(startStateMachineTask);

View File

@ -32,6 +32,9 @@
import java.net.URL;
/**
* Tests ozone containers.
*/
public class TestOzoneContainer {
/**
* Set the timeout for every test.
@ -55,12 +58,11 @@ public void testCreateOzoneContainer() throws Exception {
// We don't start Ozone Container via data node, we will do it
// independently in our test path.
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline
(containerName);
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(
containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes
().get(0).getFSDataset());
OzoneContainer container = new OzoneContainer(conf);
container.start();
XceiverClient client = new XceiverClient(pipeline, conf);