YARN-7210. Some NPE fixes in Registry DNS. Contributed by Jian He

This commit is contained in:
Billie Rinaldi 2017-09-21 10:18:42 -07:00 committed by Jian He
parent 37c9b7327d
commit ce74e64363
12 changed files with 53 additions and 30 deletions

View File

@ -344,11 +344,7 @@ private void registerServiceInstance(ApplicationAttemptId attemptId,
attemptId.getApplicationId().toString()); attemptId.getApplicationId().toString());
serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE, serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE,
PersistencePolicies.APPLICATION); PersistencePolicies.APPLICATION);
serviceRecord.description = "Yarn Service Master"; serviceRecord.description = "YarnServiceMaster";
serviceRecord.addExternalEndpoint(RegistryTypeUtils
.ipcEndpoint("classpath:org.apache.hadoop.yarn.service.appmaster.ipc",
context.clientAMService.getBindAddress()));
// set any provided attributes // set any provided attributes
setUserProvidedServiceRecordAttributes(service.getConfiguration(), setUserProvidedServiceRecordAttributes(service.getConfiguration(),

View File

@ -170,6 +170,9 @@ private Service loadAppJsonFromLocalFS(
if (!StringUtils.isEmpty(args.getServiceName())) { if (!StringUtils.isEmpty(args.getServiceName())) {
service.setName(args.getServiceName()); service.setName(args.getServiceName());
} }
if (!StringUtils.isEmpty(args.queue)) {
service.setQueue(args.queue);
}
return service; return service;
} }

View File

@ -37,7 +37,7 @@ public File getFile() {
} }
@Parameter(names = { @Parameter(names = {
ARG_QUEUE }, description = "Queue to submit the service") ARG_QUEUE, ARG_SHORT_QUEUE}, description = "Queue to submit the service")
public String queue; public String queue;
@Parameter(names = { @Parameter(names = {

View File

@ -77,6 +77,7 @@ public interface Arguments {
String ARG_PATH = "--path"; String ARG_PATH = "--path";
String ARG_PRINCIPAL = "--principal"; String ARG_PRINCIPAL = "--principal";
String ARG_QUEUE = "--queue"; String ARG_QUEUE = "--queue";
String ARG_SHORT_QUEUE = "-q";
String ARG_LIFETIME = "--lifetime"; String ARG_LIFETIME = "--lifetime";
String ARG_RESOURCE = "--resource"; String ARG_RESOURCE = "--resource";
String ARG_RESOURCE_MANAGER = "--rm"; String ARG_RESOURCE_MANAGER = "--rm";

View File

@ -59,6 +59,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
@ -356,12 +357,11 @@ private void updateServiceRecord(
YarnRegistryViewForProviders yarnRegistry, ContainerStatus status) { YarnRegistryViewForProviders yarnRegistry, ContainerStatus status) {
ServiceRecord record = new ServiceRecord(); ServiceRecord record = new ServiceRecord();
String containerId = status.getContainerId().toString(); String containerId = status.getContainerId().toString();
record.set(YarnRegistryAttributes.YARN_ID, containerId); record.set(YARN_ID, containerId);
record.description = getCompInstanceName(); record.description = getCompInstanceName();
record.set(YarnRegistryAttributes.YARN_PERSISTENCE, record.set(YARN_PERSISTENCE, PersistencePolicies.CONTAINER);
PersistencePolicies.CONTAINER); record.set(YARN_IP, status.getIPs().get(0));
record.set("yarn:ip", status.getIPs()); record.set(YARN_HOSTNAME, status.getHost());
record.set("yarn:hostname", status.getHost());
try { try {
yarnRegistry yarnRegistry
.putComponent(RegistryPathUtils.encodeYarnID(containerId), record); .putComponent(RegistryPathUtils.encodeYarnID(containerId), record);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.yarn.service.provider; package org.apache.hadoop.yarn.service.provider;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
@ -91,13 +92,16 @@ public void buildContainerLaunchContext(AbstractLauncher launcher,
component, tokensForSubstitution, instance, context); component, tokensForSubstitution, instance, context);
// substitute launch command // substitute launch command
String launchCommand = ProviderUtils String launchCommand = component.getLaunchCommand();
.substituteStrWithTokens(component.getLaunchCommand(), // docker container may have empty commands
tokensForSubstitution); if (!StringUtils.isEmpty(launchCommand)) {
CommandLineBuilder operation = new CommandLineBuilder(); launchCommand = ProviderUtils
operation.add(launchCommand); .substituteStrWithTokens(launchCommand, tokensForSubstitution);
operation.addOutAndErrFiles(OUT_FILE, ERR_FILE); CommandLineBuilder operation = new CommandLineBuilder();
launcher.addCommand(operation.build()); operation.add(launchCommand);
operation.addOutAndErrFiles(OUT_FILE, ERR_FILE);
launcher.addCommand(operation.build());
}
// By default retry forever every 30 seconds // By default retry forever every 30 seconds
launcher.setRetryContext(YarnServiceConf launcher.setRetryContext(YarnServiceConf

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import static org.apache.hadoop.registry.client.types.AddressTypes.*;
import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ProtocolTypes; import org.apache.hadoop.registry.client.types.ProtocolTypes;
import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.ServiceRecord;
@ -36,6 +35,8 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.registry.client.types.AddressTypes.*;
/** /**
* Static methods to work with registry types primarily endpoints and the * Static methods to work with registry types primarily endpoints and the
* list representation of addresses. * list representation of addresses.

View File

@ -18,6 +18,8 @@
import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xbill.DNS.Name; import org.xbill.DNS.Name;
import org.xbill.DNS.Type; import org.xbill.DNS.Type;
@ -32,7 +34,8 @@
*/ */
public class ApplicationServiceRecordProcessor extends public class ApplicationServiceRecordProcessor extends
BaseServiceRecordProcessor { BaseServiceRecordProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationServiceRecordProcessor.class);
/** /**
* Create an application service record processor. * Create an application service record processor.
* *
@ -57,6 +60,10 @@ public ApplicationServiceRecordProcessor(
*/ */
@Override public void initTypeToInfoMapping(ServiceRecord serviceRecord) @Override public void initTypeToInfoMapping(ServiceRecord serviceRecord)
throws Exception { throws Exception {
if (serviceRecord.external.isEmpty()) {
LOG.info(serviceRecord.description + ": No external endpoints defined.");
return;
}
for (int type : getRecordTypes()) { for (int type : getRecordTypes()) {
switch (type) { switch (type) {
case Type.A: case Type.A:
@ -309,6 +316,9 @@ public AApplicationRecordDescriptor(String path,
throws Exception { throws Exception {
this.setNames(new Name[] {getServiceName()}); this.setNames(new Name[] {getServiceName()});
List<Endpoint> endpoints = serviceRecord.external; List<Endpoint> endpoints = serviceRecord.external;
if (endpoints.isEmpty()) {
return;
}
// TODO: do we need a "hostname" attribute for an application record or // TODO: do we need a "hostname" attribute for an application record or
// can we rely on the first endpoint record. // can we rely on the first endpoint record.
this.setTarget(InetAddress.getByName( this.setTarget(InetAddress.getByName(
@ -342,6 +352,9 @@ public AAAAApplicationRecordDescriptor(String path,
@Override protected void init(ServiceRecord serviceRecord) @Override protected void init(ServiceRecord serviceRecord)
throws Exception { throws Exception {
super.init(serviceRecord); super.init(serviceRecord);
if (getTarget() == null) {
return;
}
try { try {
this.setTarget(getIpv6Address(getTarget())); this.setTarget(getIpv6Address(getTarget()));
} catch (UnknownHostException e) { } catch (UnknownHostException e) {

View File

@ -52,8 +52,8 @@ public abstract class BaseServiceRecordProcessor
private String domain; private String domain;
private static final Pattern USER_NAME = Pattern.compile("/users/(\\w*)/?"); private static final Pattern USER_NAME = Pattern.compile("/users/(\\w*)/?");
private static final String SLIDER_API_PREFIX = private static final String YARN_SERVICE_API_PREFIX =
"classpath:org.apache.slider."; "classpath:org.apache.hadoop.yarn.service.";
private static final String HTTP_API_TYPE = "http://"; private static final String HTTP_API_TYPE = "http://";
/** /**
@ -425,8 +425,8 @@ protected List<String> getTextRecords(Endpoint endpoint) {
*/ */
protected String getDNSApiFragment(String api) { protected String getDNSApiFragment(String api) {
String dnsApi = null; String dnsApi = null;
if (api.startsWith(SLIDER_API_PREFIX)) { if (api.startsWith(YARN_SERVICE_API_PREFIX)) {
dnsApi = api.substring(SLIDER_API_PREFIX.length()); dnsApi = api.substring(YARN_SERVICE_API_PREFIX.length());
} else if (api.startsWith(HTTP_API_TYPE)) { } else if (api.startsWith(HTTP_API_TYPE)) {
dnsApi = "http"; dnsApi = "http";
} }

View File

@ -69,7 +69,8 @@ public class TestRegistryDNS extends Assert {
+ " \"type\" : \"JSONServiceRecord\",\n" + " \"type\" : \"JSONServiceRecord\",\n"
+ " \"description\" : \"Slider Application Master\",\n" + " \"description\" : \"Slider Application Master\",\n"
+ " \"external\" : [ {\n" + " \"external\" : [ {\n"
+ " \"api\" : \"classpath:org.apache.slider.appmaster.ipc\",\n" + " \"api\" : \"classpath:org.apache.hadoop.yarn.service.appmaster.ipc"
+ "\",\n"
+ " \"addressType\" : \"host/port\",\n" + " \"addressType\" : \"host/port\",\n"
+ " \"protocolType\" : \"hadoop/IPC\",\n" + " \"protocolType\" : \"hadoop/IPC\",\n"
+ " \"addresses\" : [ {\n" + " \"addresses\" : [ {\n"
@ -84,7 +85,8 @@ public class TestRegistryDNS extends Assert {
+ " \"uri\" : \"http://192.168.1.5:1027\"\n" + " \"uri\" : \"http://192.168.1.5:1027\"\n"
+ " } ]\n" + " } ]\n"
+ " }, {\n" + " }, {\n"
+ " \"api\" : \"classpath:org.apache.slider.management\",\n" + " \"api\" : \"classpath:org.apache.hadoop.yarn.service.management\""
+ ",\n"
+ " \"addressType\" : \"uri\",\n" + " \"addressType\" : \"uri\",\n"
+ " \"protocolType\" : \"REST\",\n" + " \"protocolType\" : \"REST\",\n"
+ " \"addresses\" : [ {\n" + " \"addresses\" : [ {\n"
@ -92,14 +94,16 @@ public class TestRegistryDNS extends Assert {
+ " } ]\n" + " } ]\n"
+ " } ],\n" + " } ],\n"
+ " \"internal\" : [ {\n" + " \"internal\" : [ {\n"
+ " \"api\" : \"classpath:org.apache.slider.agents.secure\",\n" + " \"api\" : \"classpath:org.apache.hadoop.yarn.service.agents.secure"
+ "\",\n"
+ " \"addressType\" : \"uri\",\n" + " \"addressType\" : \"uri\",\n"
+ " \"protocolType\" : \"REST\",\n" + " \"protocolType\" : \"REST\",\n"
+ " \"addresses\" : [ {\n" + " \"addresses\" : [ {\n"
+ " \"uri\" : \"https://192.168.1.5:47700/ws/v1/slider/agents\"\n" + " \"uri\" : \"https://192.168.1.5:47700/ws/v1/slider/agents\"\n"
+ " } ]\n" + " } ]\n"
+ " }, {\n" + " }, {\n"
+ " \"api\" : \"classpath:org.apache.slider.agents.oneway\",\n" + " \"api\" : \"classpath:org.apache.hadoop.yarn.service.agents.oneway"
+ "\",\n"
+ " \"addressType\" : \"uri\",\n" + " \"addressType\" : \"uri\",\n"
+ " \"protocolType\" : \"REST\",\n" + " \"protocolType\" : \"REST\",\n"
+ " \"addresses\" : [ {\n" + " \"addresses\" : [ {\n"

View File

@ -812,13 +812,13 @@ public String[] getIpAndHost(Container container) {
.executePrivilegedOperation(null, privOp, null, .executePrivilegedOperation(null, privOp, null,
null, true, false); null, true, false);
LOG.info("Docker inspect output for " + containerId + ": " + output); LOG.info("Docker inspect output for " + containerId + ": " + output);
// strip off quotes if any
output = output.replaceAll("['\"]", "");
int index = output.lastIndexOf(','); int index = output.lastIndexOf(',');
if (index == -1) { if (index == -1) {
LOG.error("Incorrect format for ip and host"); LOG.error("Incorrect format for ip and host");
return null; return null;
} }
// strip off quotes if any
output = output.replaceAll("['\"]", "");
String ips = output.substring(0, index).trim(); String ips = output.substring(0, index).trim();
String host = output.substring(index+1).trim(); String host = output.substring(index+1).trim();
String[] ipAndHost = new String[2]; String[] ipAndHost = new String[2];

View File

@ -92,6 +92,7 @@ Usage `yarn service [sub-command] [service-name] [options]`
Options: Options:
--file,-f The local path to the service definition file. --file,-f The local path to the service definition file.
--queue,-q The queue to which the service is submitted.
--example,-e The name of the example service such as: --example,-e The name of the example service such as:
Sleeper A simple service that launches a few non-docker sleep containers on YARN. Sleeper A simple service that launches a few non-docker sleep containers on YARN.
``` ```