HDFS-11480. Ozone: TestEndpoint task failure. Contributed by Xiaoyu Yao.

This commit is contained in:
Anu Engineer 2017-03-02 11:18:41 -08:00 committed by Owen O'Malley
parent 2cba0daddc
commit a2328d36ac

View File

@ -17,10 +17,13 @@
package org.apache.hadoop.ozone.container.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.states.endpoint
.HeartbeatEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint
@ -31,6 +34,10 @@
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
@ -38,6 +45,7 @@
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.scm.VersionInfo;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Assert;
@ -45,9 +53,12 @@
import org.junit.Test;
import org.mockito.internal.matchers.LessOrEqual;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.UUID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
/**
* Tests the endpoints.
*/
@ -55,6 +66,7 @@ public class TestEndPoint {
private static InetSocketAddress serverAddress;
private static RPC.Server scmServer;
private static ScmTestMock scmServerImpl;
private static File testDir;
@Test
/**
@ -169,12 +181,13 @@ public void testRegister() throws Exception {
private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
int rpcTimeout, boolean clearContainerID) throws Exception {
Configuration conf = SCMTestUtils.getConf();
EndpointStateMachine rpcEndPoint =
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
SCMTestUtils.createEndpoint(conf,
scmAddress, rpcTimeout);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER);
RegisterEndpointTask endpointTask =
new RegisterEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
new RegisterEndpointTask(rpcEndPoint, conf);
if (!clearContainerID) {
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
.setClusterID(UUID.randomUUID().toString())
@ -236,8 +249,13 @@ public void testHeartbeat() throws Exception {
try (EndpointStateMachine rpcEndPoint =
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build();
nrb.addStorageReport(srb);
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
.sendHeartbeat(dataNode, null);
.sendHeartbeat(dataNode, nrb.build());
Assert.assertNotNull(responseProto);
Assert.assertEquals(1, responseProto.getCommandsCount());
Assert.assertNotNull(responseProto.getCommandsList().get(0));
@ -248,16 +266,24 @@ public void testHeartbeat() throws Exception {
private EndpointStateMachine heartbeatTaskHelper(InetSocketAddress scmAddress,
int rpcTimeout) throws Exception {
Configuration conf = SCMTestUtils.getConf();
EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(
SCMTestUtils.getConf(),
scmAddress, rpcTimeout);
conf, scmAddress, rpcTimeout);
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
.setClusterID(UUID.randomUUID().toString())
.setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
.build();
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
// Create a datanode state machine for stateConext used by endpoint task
conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
final StateContext stateContext = new StateContext(conf,
DatanodeStateMachine.DatanodeStates.RUNNING,
stateMachine);
HeartbeatEndpointTask endpointTask =
new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf(), null);
new HeartbeatEndpointTask(rpcEndPoint, conf, stateContext);
endpointTask.setContainerNodeIDProto(containerNodeID);
endpointTask.call();
Assert.assertNotNull(endpointTask.getContainerNodeIDProto());
@ -302,6 +328,7 @@ public static void tearDown() throws Exception {
if (scmServer != null) {
scmServer.stop();
}
FileUtil.fullyDelete(testDir);
}
@BeforeClass
@ -310,5 +337,6 @@ public static void setUp() throws Exception {
scmServerImpl = new ScmTestMock();
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
scmServerImpl, serverAddress, 10);
testDir = PathUtils.getTestDir(TestEndPoint.class);
}
}