HDDS-158. DatanodeStateMachine endPoint task throws NullPointerException. Contributed by Nanda Kumar.

This commit is contained in:
Mukul Kumar Singh 2018-06-12 20:36:23 +05:30
parent b3612dd90c
commit aa7614c5f3
2 changed files with 88 additions and 3 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.states.endpoint; package org.apache.hadoop.ozone.container.common.states.endpoint;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage; import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -125,9 +126,14 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
*/ */
private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
for (GeneratedMessage report : context.getAllAvailableReports()) { for (GeneratedMessage report : context.getAllAvailableReports()) {
requestBuilder.setField( String reportName = report.getDescriptorForType().getFullName();
SCMHeartbeatRequestProto.getDescriptor().findFieldByName( for (Descriptors.FieldDescriptor descriptor :
report.getDescriptorForType().getName()), report); SCMHeartbeatRequestProto.getDescriptor().getFields()) {
String heartbeatFieldName = descriptor.getMessageType().getFullName();
if (heartbeatFieldName.equals(reportName)) {
requestBuilder.setField(descriptor, report);
}
}
} }
} }

View File

@ -18,13 +18,25 @@
package org.apache.hadoop.ozone.container.common.report; package org.apache.hadoop.ozone.container.common.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage; import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -103,4 +115,71 @@ public void testPublishReport() throws InterruptedException {
} }
@Test
public void testAddingReportToHeartbeat() {
Configuration conf = new OzoneConfiguration();
ReportPublisherFactory factory = new ReportPublisherFactory(conf);
ReportPublisher nodeReportPublisher = factory.getPublisherFor(
NodeReportProto.class);
ReportPublisher containerReportPubliser = factory.getPublisherFor(
ContainerReportsProto.class);
GeneratedMessage nodeReport = nodeReportPublisher.getReport();
GeneratedMessage containerReport = containerReportPubliser.getReport();
SCMHeartbeatRequestProto.Builder heartbeatBuilder =
SCMHeartbeatRequestProto.newBuilder();
heartbeatBuilder.setDatanodeDetails(
getDatanodeDetails().getProtoBufMessage());
addReport(heartbeatBuilder, nodeReport);
addReport(heartbeatBuilder, containerReport);
SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
Assert.assertTrue(heartbeat.hasNodeReport());
Assert.assertTrue(heartbeat.hasContainerReport());
}
/**
* Get a datanode details.
*
* @return DatanodeDetails
*/
private static DatanodeDetails getDatanodeDetails() {
String uuid = UUID.randomUUID().toString();
Random random = new Random();
String ipAddress =
random.nextInt(256) + "." + random.nextInt(256) + "." + random
.nextInt(256) + "." + random.nextInt(256);
DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.STANDALONE, 0);
DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.RATIS, 0);
DatanodeDetails.Port restPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.REST, 0);
DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
builder.setUuid(uuid)
.setHostName("localhost")
.setIpAddress(ipAddress)
.addPort(containerPort)
.addPort(ratisPort)
.addPort(restPort);
return builder.build();
}
/**
* Adds the report to heartbeat.
*
* @param requestBuilder builder to which the report has to be added.
* @param report the report to be added.
*/
private static void addReport(SCMHeartbeatRequestProto.Builder requestBuilder,
GeneratedMessage report) {
String reportName = report.getDescriptorForType().getFullName();
for (Descriptors.FieldDescriptor descriptor :
SCMHeartbeatRequestProto.getDescriptor().getFields()) {
String heartbeatFieldName = descriptor.getMessageType().getFullName();
if (heartbeatFieldName.equals(reportName)) {
requestBuilder.setField(descriptor, report);
}
}
}
} }