YARN-5218. Initial core change for DNS for YARN. Contributed by Jonathan Maron

This commit is contained in:
Jian He 2016-06-12 11:32:03 -07:00
parent 74fff4086e
commit f934f62fc7
24 changed files with 4659 additions and 91 deletions

View File

@ -89,6 +89,7 @@
<curator.version>2.12.0</curator.version> <curator.version>2.12.0</curator.version>
<findbugs.version>3.0.0</findbugs.version> <findbugs.version>3.0.0</findbugs.version>
<spotbugs.version>3.1.0-RC1</spotbugs.version> <spotbugs.version>3.1.0-RC1</spotbugs.version>
<dnsjava.version>2.1.7</dnsjava.version>
<guava.version>11.0.2</guava.version> <guava.version>11.0.2</guava.version>
<guice.version>4.0</guice.version> <guice.version>4.0</guice.version>
@ -1217,6 +1218,13 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
<version>${dnsjava.version}</version>
</dependency>
<dependency> <dependency>
<!-- HACK. Transitive dependency for nimbus-jose-jwt. Needed for <!-- HACK. Transitive dependency for nimbus-jose-jwt. Needed for
packaging. Please re-check this version when updating packaging. Please re-check this version when updating

View File

@ -629,8 +629,18 @@
</Match> </Match>
<Match> <Match>
<Class name="org.apache.hadoop.yarn.api.records.Resource" /> <Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
<Method name="getResources" /> <Method name="addNIOTCP" />
<Bug pattern="EI_EXPOSE_REP" /> <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>
<Match>
<Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
<Method name="addNIOUDP" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>
<Match>
<Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
<Method name="serveNIOTCP" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match> </Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -80,6 +80,11 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.client.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.service.Service;
import java.io.IOException;
/**
* DNS Operations.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface DNSOperations extends Service {
/**
* Register a service based on a service record.
*
* @param path the ZK path.
* @param record record providing DNS registration info.
* @throws IOException Any other IO Exception.
*/
void register(String path, ServiceRecord record)
throws IOException;
/**
* Delete a service's registered endpoints.
*
* If the operation returns without an error then the entry has been
* deleted.
*
* @param path the ZK path.
* @param record service record
* @throws IOException Any other IO Exception
*
*/
void delete(String path, ServiceRecord record)
throws IOException;
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.client.api;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.registry.server.dns.RegistryDNS;
/**
* A factory for DNS operation service instances.
*/
public final class DNSOperationsFactory implements RegistryConstants {
/**
* DNS Implementation type.
*/
public enum DNSImplementation {
DNSJAVA
}
private DNSOperationsFactory() {
}
/**
* Create and initialize a DNS operations instance.
*
* @param conf configuration
* @return a DNS operations instance
*/
public static DNSOperations createInstance(Configuration conf) {
return createInstance("DNSOperations", DNSImplementation.DNSJAVA, conf);
}
/**
* Create and initialize a registry operations instance.
* Access rights will be determined from the configuration.
*
* @param name name of the instance
* @param impl the DNS implementation.
* @param conf configuration
* @return a registry operations instance
*/
public static DNSOperations createInstance(String name,
DNSImplementation impl,
Configuration conf) {
Preconditions.checkArgument(conf != null, "Null configuration");
DNSOperations operations = null;
switch (impl) {
case DNSJAVA:
operations = new RegistryDNS(name);
break;
default:
throw new IllegalArgumentException(
String.format("%s is not available", impl.toString()));
}
//operations.init(conf);
return operations;
}
}

View File

@ -43,17 +43,106 @@ public interface RegistryConstants {
*/ */
String ZK_PREFIX = REGISTRY_PREFIX + "zk."; String ZK_PREFIX = REGISTRY_PREFIX + "zk.";
/**
* Prefix for dns-specific options: {@value}
* <p>
* For clients using other protocols, these options are not supported.
*/
String DNS_PREFIX = REGISTRY_PREFIX + "dns.";
/** /**
* flag to indicate whether or not the registry should * flag to indicate whether or not the registry should
* be enabled in the RM: {@value} * be enabled in the RM: {@value}.
*/ */
String KEY_REGISTRY_ENABLED = REGISTRY_PREFIX + "rm.enabled"; String KEY_REGISTRY_ENABLED = REGISTRY_PREFIX + "rm.enabled";
/** /**
* Defaut value for enabling the registry in the RM: {@value} * Defaut value for enabling the registry in the RM: {@value}.
*/ */
boolean DEFAULT_REGISTRY_ENABLED = false; boolean DEFAULT_REGISTRY_ENABLED = false;
/**
* flag to indicate whether or not the registry should
* be enabled in the RM: {@value}.
*/
String KEY_DNS_ENABLED = DNS_PREFIX + "enabled";
/**
* Defaut value for enabling the DNS in the Registry: {@value}.
*/
boolean DEFAULT_DNS_ENABLED = false;
/**
* DNS domain name key.
*/
String KEY_DNS_DOMAIN = DNS_PREFIX + "domain-name";
/**
* DNS bind address.
*/
String KEY_DNS_BIND_ADDRESS = DNS_PREFIX + "bind-address";
/**
* DNS port number key.
*/
String KEY_DNS_PORT = DNS_PREFIX + "bind-port";
/**
* Default DNS port number.
*/
int DEFAULT_DNS_PORT = 53;
/**
* DNSSEC Enabled?
*/
String KEY_DNSSEC_ENABLED = DNS_PREFIX + "dnssec.enabled";
/**
* DNSSEC Enabled?
*/
String KEY_DNSSEC_PUBLIC_KEY = DNS_PREFIX + "public-key";
/**
* DNSSEC private key file.
*/
String KEY_DNSSEC_PRIVATE_KEY_FILE = DNS_PREFIX + "private-key-file";
/**
* Default DNSSEC private key file path.
*/
String DEFAULT_DNSSEC_PRIVATE_KEY_FILE =
"/etc/hadoop/conf/registryDNS.private";
/**
* Zone subnet.
*/
String KEY_DNS_ZONE_SUBNET = DNS_PREFIX + "zone-subnet";
/**
* Zone subnet mask.
*/
String KEY_DNS_ZONE_MASK = DNS_PREFIX + "zone-mask";
/**
* Zone subnet IP min.
*/
String KEY_DNS_ZONE_IP_MIN = DNS_PREFIX + "zone-ip-min";
/**
* Zone subnet IP max.
*/
String KEY_DNS_ZONE_IP_MAX = DNS_PREFIX + "zone-ip-max";
/**
* DNS Record TTL.
*/
String KEY_DNS_TTL = DNS_PREFIX + "dns-ttl";
/**
* DNS Record TTL.
*/
String KEY_DNS_ZONES_DIR = DNS_PREFIX + "zones-dir";
/** /**
* Key to set if the registry is secure: {@value}. * Key to set if the registry is secure: {@value}.
* Turning it on changes the permissions policy from "open access" * Turning it on changes the permissions policy from "open access"
@ -69,12 +158,12 @@ public interface RegistryConstants {
boolean DEFAULT_REGISTRY_SECURE = false; boolean DEFAULT_REGISTRY_SECURE = false;
/** /**
* Root path in the ZK tree for the registry: {@value} * Root path in the ZK tree for the registry: {@value}.
*/ */
String KEY_REGISTRY_ZK_ROOT = ZK_PREFIX + "root"; String KEY_REGISTRY_ZK_ROOT = ZK_PREFIX + "root";
/** /**
* Default root of the yarn registry: {@value} * Default root of the yarn registry: {@value}.
*/ */
String DEFAULT_ZK_REGISTRY_ROOT = "/registry"; String DEFAULT_ZK_REGISTRY_ROOT = "/registry";
@ -92,7 +181,7 @@ public interface RegistryConstants {
/** /**
* Registry client uses Kerberos: authentication is automatic from * Registry client uses Kerberos: authentication is automatic from
* logged in user * logged in user.
*/ */
String REGISTRY_CLIENT_AUTH_KERBEROS = "kerberos"; String REGISTRY_CLIENT_AUTH_KERBEROS = "kerberos";
@ -104,12 +193,12 @@ public interface RegistryConstants {
String REGISTRY_CLIENT_AUTH_DIGEST = "digest"; String REGISTRY_CLIENT_AUTH_DIGEST = "digest";
/** /**
* No authentication; client is anonymous * No authentication; client is anonymous.
*/ */
String REGISTRY_CLIENT_AUTH_ANONYMOUS = ""; String REGISTRY_CLIENT_AUTH_ANONYMOUS = "";
/** /**
* Registry client authentication ID * Registry client authentication ID.
* <p> * <p>
* This is only used in secure clusters with * This is only used in secure clusters with
* {@link #KEY_REGISTRY_CLIENT_AUTH} set to * {@link #KEY_REGISTRY_CLIENT_AUTH} set to
@ -134,17 +223,17 @@ public interface RegistryConstants {
/** /**
* List of hostname:port pairs defining the * List of hostname:port pairs defining the
* zookeeper quorum binding for the registry {@value} * zookeeper quorum binding for the registry {@value}.
*/ */
String KEY_REGISTRY_ZK_QUORUM = ZK_PREFIX + "quorum"; String KEY_REGISTRY_ZK_QUORUM = ZK_PREFIX + "quorum";
/** /**
* The default zookeeper quorum binding for the registry: {@value} * The default zookeeper quorum binding for the registry: {@value}.
*/ */
String DEFAULT_REGISTRY_ZK_QUORUM = "localhost:2181"; String DEFAULT_REGISTRY_ZK_QUORUM = "localhost:2181";
/** /**
* Zookeeper session timeout in milliseconds: {@value} * Zookeeper session timeout in milliseconds: {@value}.
*/ */
String KEY_REGISTRY_ZK_SESSION_TIMEOUT = String KEY_REGISTRY_ZK_SESSION_TIMEOUT =
ZK_PREFIX + "session.timeout.ms"; ZK_PREFIX + "session.timeout.ms";
@ -259,7 +348,7 @@ public interface RegistryConstants {
String KEY_REGISTRY_CLIENT_JAAS_CONTEXT = REGISTRY_PREFIX + "jaas.context"; String KEY_REGISTRY_CLIENT_JAAS_CONTEXT = REGISTRY_PREFIX + "jaas.context";
/** /**
* default client-side registry JAAS context: {@value} * default client-side registry JAAS context: {@value}.
*/ */
String DEFAULT_REGISTRY_CLIENT_JAAS_CONTEXT = "Client"; String DEFAULT_REGISTRY_CLIENT_JAAS_CONTEXT = "Client";

View File

@ -28,6 +28,9 @@
import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.DeleteBuilder; import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -36,14 +39,14 @@
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException; import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.AuthenticationFailedException; import org.apache.hadoop.registry.client.exceptions.AuthenticationFailedException;
import org.apache.hadoop.registry.client.exceptions.NoChildrenForEphemeralsException; import org.apache.hadoop.registry.client.exceptions.NoChildrenForEphemeralsException;
import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException; import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException;
import org.apache.hadoop.registry.client.exceptions.RegistryIOException; import org.apache.hadoop.registry.client.exceptions.RegistryIOException;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
@ -69,12 +72,12 @@ public class CuratorService extends CompositeService
LoggerFactory.getLogger(CuratorService.class); LoggerFactory.getLogger(CuratorService.class);
/** /**
* the Curator binding * the Curator binding.
*/ */
private CuratorFramework curator; private CuratorFramework curator;
/** /**
* Path to the registry root * Path to the registry root.
*/ */
private String registryRoot; private String registryRoot;
@ -85,17 +88,17 @@ public class CuratorService extends CompositeService
private final RegistryBindingSource bindingSource; private final RegistryBindingSource bindingSource;
/** /**
* Security service * Security service.
*/ */
private RegistrySecurity registrySecurity; private RegistrySecurity registrySecurity;
/** /**
* the connection binding text for messages * the connection binding text for messages.
*/ */
private String connectionDescription; private String connectionDescription;
/** /**
* Security connection diagnostics * Security connection diagnostics.
*/ */
private String securityConnectionDiagnostics = ""; private String securityConnectionDiagnostics = "";
@ -105,8 +108,14 @@ public class CuratorService extends CompositeService
*/ */
private EnsembleProvider ensembleProvider; private EnsembleProvider ensembleProvider;
/**
* Registry tree cache.
*/
private TreeCache treeCache;
/** /**
* Construct the service. * Construct the service.
*
* @param name service name * @param name service name
* @param bindingSource source of binding information. * @param bindingSource source of binding information.
* If null: use this instance * If null: use this instance
@ -122,7 +131,8 @@ public CuratorService(String name, RegistryBindingSource bindingSource) {
/** /**
* Create an instance using this service as the binding source (i.e. read * Create an instance using this service as the binding source (i.e. read
* configuration options from the registry) * configuration options from the registry).
*
* @param name service name * @param name service name
*/ */
public CuratorService(String name) { public CuratorService(String name) {
@ -131,7 +141,8 @@ public CuratorService(String name) {
/** /**
* Init the service. * Init the service.
* This is where the security bindings are set up * This is where the security bindings are set up.
*
* @param conf configuration of the service * @param conf configuration of the service
* @throws Exception * @throws Exception
*/ */
@ -155,6 +166,7 @@ protected void serviceInit(Configuration conf) throws Exception {
/** /**
* Start the service. * Start the service.
* This is where the curator instance is started. * This is where the curator instance is started.
*
* @throws Exception * @throws Exception
*/ */
@Override @Override
@ -167,16 +179,21 @@ protected void serviceStart() throws Exception {
} }
/** /**
* Close the ZK connection if it is open * Close the ZK connection if it is open.
*/ */
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
IOUtils.closeStream(curator); IOUtils.closeStream(curator);
if (treeCache != null) {
treeCache.close();
}
super.serviceStop(); super.serviceStop();
} }
/** /**
* Internal check that a service is in the live state * Internal check that a service is in the live state.
*
* @throws ServiceStateException if not * @throws ServiceStateException if not
*/ */
private void checkServiceLive() throws ServiceStateException { private void checkServiceLive() throws ServiceStateException {
@ -190,6 +207,7 @@ private void checkServiceLive() throws ServiceStateException {
/** /**
* Flag to indicate whether or not the registry is secure. * Flag to indicate whether or not the registry is secure.
* Valid once the service is inited. * Valid once the service is inited.
*
* @return service security policy * @return service security policy
*/ */
public boolean isSecure() { public boolean isSecure() {
@ -197,7 +215,8 @@ public boolean isSecure() {
} }
/** /**
* Get the registry security helper * Get the registry security helper.
*
* @return the registry security helper * @return the registry security helper
*/ */
protected RegistrySecurity getRegistrySecurity() { protected RegistrySecurity getRegistrySecurity() {
@ -205,7 +224,8 @@ protected RegistrySecurity getRegistrySecurity() {
} }
/** /**
* Build the security diagnostics string * Build the security diagnostics string.
*
* @return a string for diagnostics * @return a string for diagnostics
*/ */
protected String buildSecurityDiagnostics() { protected String buildSecurityDiagnostics() {
@ -224,6 +244,7 @@ protected String buildSecurityDiagnostics() {
* Create a new curator instance off the root path; using configuration * Create a new curator instance off the root path; using configuration
* options provided in the service configuration to set timeouts and * options provided in the service configuration to set timeouts and
* retry policy. * retry policy.
*
* @return the newly created creator * @return the newly created creator
*/ */
private CuratorFramework createCurator() throws IOException { private CuratorFramework createCurator() throws IOException {
@ -250,7 +271,8 @@ private CuratorFramework createCurator() throws IOException {
// set the security options // set the security options
// build up the curator itself // build up the curator itself
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory.builder();
builder.ensembleProvider(ensembleProvider) builder.ensembleProvider(ensembleProvider)
.connectionTimeoutMs(connectionTimeout) .connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout) .sessionTimeoutMs(sessionTimeout)
@ -277,7 +299,8 @@ public String toString() {
} }
/** /**
* Get the binding diagnostics * Get the binding diagnostics.
*
* @return a diagnostics string valid after the service is started. * @return a diagnostics string valid after the service is started.
*/ */
public String bindingDiagnosticDetails() { public String bindingDiagnosticDetails() {
@ -287,7 +310,8 @@ public String bindingDiagnosticDetails() {
} }
/** /**
* Create a full path from the registry root and the supplied subdir * Create a full path from the registry root and the supplied subdir.
*
* @param path path of operation * @param path path of operation
* @return an absolute path * @return an absolute path
* @throws IllegalArgumentException if the path is invalide * @throws IllegalArgumentException if the path is invalide
@ -299,6 +323,7 @@ protected String createFullPath(String path) throws IOException {
/** /**
* Get the registry binding source ... this can be used to * Get the registry binding source ... this can be used to
* create new ensemble providers * create new ensemble providers
*
* @return the registry binding source in use * @return the registry binding source in use
*/ */
public RegistryBindingSource getBindingSource() { public RegistryBindingSource getBindingSource() {
@ -308,11 +333,10 @@ public RegistryBindingSource getBindingSource() {
/** /**
* Create the ensemble provider for this registry, by invoking * Create the ensemble provider for this registry, by invoking
* {@link RegistryBindingSource#supplyBindingInformation()} on * {@link RegistryBindingSource#supplyBindingInformation()} on
* the provider stored in {@link #bindingSource} * the provider stored in {@link #bindingSource}.
* Sets {@link #ensembleProvider} to that value; * Sets {@link #ensembleProvider} to that value;
* sets {@link #connectionDescription} to the binding info * sets {@link #connectionDescription} to the binding info
* for use in toString and logging; * for use in toString and logging;
*
*/ */
protected void createEnsembleProvider() { protected void createEnsembleProvider() {
BindingInformation binding = bindingSource.supplyBindingInformation(); BindingInformation binding = bindingSource.supplyBindingInformation();
@ -324,7 +348,8 @@ protected void createEnsembleProvider() {
/** /**
* Supply the binding information. * Supply the binding information.
* This implementation returns a fixed ensemble bonded to * This implementation returns a fixed ensemble bonded to
* the quorum supplied by {@link #buildConnectionString()} * the quorum supplied by {@link #buildConnectionString()}.
*
* @return the binding information * @return the binding information
*/ */
@Override @Override
@ -339,7 +364,8 @@ public BindingInformation supplyBindingInformation() {
/** /**
* Override point: get the connection string used to connect to * Override point: get the connection string used to connect to
* the ZK service * the ZK service.
*
* @return a registry quorum * @return a registry quorum
*/ */
protected String buildConnectionString() { protected String buildConnectionString() {
@ -348,7 +374,8 @@ protected String buildConnectionString() {
} }
/** /**
* Create an IOE when an operation fails * Create an IOE when an operation fails.
*
* @param path path of operation * @param path path of operation
* @param operation operation attempted * @param operation operation attempted
* @param exception caught the exception caught * @param exception caught the exception caught
@ -361,7 +388,8 @@ protected IOException operationFailure(String path,
} }
/** /**
* Create an IOE when an operation fails * Create an IOE when an operation fails.
*
* @param path path of operation * @param path path of operation
* @param operation operation attempted * @param operation operation attempted
* @param exception caught the exception caught * @param exception caught the exception caught
@ -387,7 +415,8 @@ protected IOException operationFailure(String path,
"Authentication Failed: " + exception "Authentication Failed: " + exception
+ "; " + securityConnectionDiagnostics, + "; " + securityConnectionDiagnostics,
exception); exception);
} else if (exception instanceof KeeperException.NoChildrenForEphemeralsException) { } else if (exception instanceof
KeeperException.NoChildrenForEphemeralsException) {
ioe = new NoChildrenForEphemeralsException(path, ioe = new NoChildrenForEphemeralsException(path,
"Cannot create a path under an ephemeral node: " + exception, "Cannot create a path under an ephemeral node: " + exception,
exception); exception);
@ -432,7 +461,8 @@ public boolean maybeCreate(String path,
} }
/** /**
* Stat the file * Stat the file.
*
* @param path path of operation * @param path path of operation
* @return a curator stat entry * @return a curator stat entry
* @throws IOException on a failure * @throws IOException on a failure
@ -457,7 +487,8 @@ public Stat zkStat(String path) throws IOException {
} }
/** /**
* Get the ACLs of a path * Get the ACLs of a path.
*
* @param path path of operation * @param path path of operation
* @return a possibly empty list of ACLs * @return a possibly empty list of ACLs
* @throws IOException * @throws IOException
@ -481,7 +512,8 @@ public List<ACL> zkGetACLS(String path) throws IOException {
} }
/** /**
* Probe for a path existing * Probe for a path existing.
*
* @param path path of operation * @param path path of operation
* @return true if the path was visible from the ZK server * @return true if the path was visible from the ZK server
* queried. * queried.
@ -503,7 +535,8 @@ public boolean zkPathExists(String path) throws IOException {
} }
/** /**
* Verify a path exists * Verify a path exists.
*
* @param path path of operation * @param path path of operation
* @throws PathNotFoundException if the path is absent * @throws PathNotFoundException if the path is absent
* @throws IOException * @throws IOException
@ -514,7 +547,8 @@ public String zkPathMustExist(String path) throws IOException {
} }
/** /**
* Create a directory. It is not an error if it already exists * Create a directory. It is not an error if it already exists.
*
* @param path path to create * @param path path to create
* @param mode mode for path * @param mode mode for path
* @param createParents flag to trigger parent creation * @param createParents flag to trigger parent creation
@ -558,7 +592,8 @@ public boolean zkMkPath(String path,
} }
/** /**
* Recursively make a path * Recursively make a path.
*
* @param path path to create * @param path path to create
* @param acl ACL for path * @param acl ACL for path
* @throws IOException any problem * @throws IOException any problem
@ -574,7 +609,8 @@ public void zkMkParentPath(String path,
/** /**
* Create a path with given data. byte[0] is used for a path * Create a path with given data. byte[0] is used for a path
* without data * without data.
*
* @param path path of operation * @param path path of operation
* @param data initial data * @param data initial data
* @param acls * @param acls
@ -600,7 +636,8 @@ public void zkCreate(String path,
} }
/** /**
* Update the data for a path * Update the data for a path.
*
* @param path path of operation * @param path path of operation
* @param data new data * @param data new data
* @throws IOException * @throws IOException
@ -620,13 +657,14 @@ public void zkUpdate(String path, byte[] data) throws IOException {
} }
/** /**
* Create or update an entry * Create or update an entry.
*
* @param path path * @param path path
* @param data data * @param data data
* @param acl ACL for path -used when creating a new entry * @param acl ACL for path -used when creating a new entry
* @param overwrite enable overwrite * @param overwrite enable overwrite
* @throws IOException
* @return true if the entry was created, false if it was simply updated. * @return true if the entry was created, false if it was simply updated.
* @throws IOException
*/ */
public boolean zkSet(String path, public boolean zkSet(String path,
CreateMode mode, CreateMode mode,
@ -649,7 +687,8 @@ public boolean zkSet(String path,
/** /**
* Delete a directory/directory tree. * Delete a directory/directory tree.
* It is not an error to delete a path that does not exist * It is not an error to delete a path that does not exist.
*
* @param path path of operation * @param path path of operation
* @param recursive flag to trigger recursive deletion * @param recursive flag to trigger recursive deletion
* @param backgroundCallback callback; this being set converts the operation * @param backgroundCallback callback; this being set converts the operation
@ -682,7 +721,8 @@ public void zkDelete(String path,
} }
/** /**
* List all children of a path * List all children of a path.
*
* @param path path of operation * @param path path of operation
* @return a possibly empty list of children * @return a possibly empty list of children
* @throws IOException * @throws IOException
@ -703,7 +743,8 @@ public List<String> zkList(String path) throws IOException {
} }
/** /**
* Read data on a path * Read data on a path.
*
* @param path path of operation * @param path path of operation
* @return the data * @return the data
* @throws IOException read failure * @throws IOException read failure
@ -724,9 +765,10 @@ public byte[] zkRead(String path) throws IOException {
/** /**
* Return a path dumper instance which can do a full dump * Return a path dumper instance which can do a full dump
* of the registry tree in its <code>toString()</code> * of the registry tree in its <code>toString()</code>
* operation * operation.
* @return a class to dump the registry *
* @param verbose verbose flag - includes more details (such as ACLs) * @param verbose verbose flag - includes more details (such as ACLs)
* @return a class to dump the registry
*/ */
public ZKPathDumper dumpPath(boolean verbose) { public ZKPathDumper dumpPath(boolean verbose) {
return new ZKPathDumper(curator, registryRoot, verbose); return new ZKPathDumper(curator, registryRoot, verbose);
@ -734,6 +776,7 @@ public ZKPathDumper dumpPath(boolean verbose) {
/** /**
* Add a new write access entry for all future write operations. * Add a new write access entry for all future write operations.
*
* @param id ID to use * @param id ID to use
* @param pass password * @param pass password
* @throws IOException on any failure to build the digest * @throws IOException on any failure to build the digest
@ -746,16 +789,16 @@ public boolean addWriteAccessor(String id, String pass) throws IOException {
} }
/** /**
* Clear all write accessors * Clear all write accessors.
*/ */
public void clearWriteAccessors() { public void clearWriteAccessors() {
getRegistrySecurity().resetDigestACLs(); getRegistrySecurity().resetDigestACLs();
} }
/** /**
* Diagnostics method to dump a registry robustly. * Diagnostics method to dump a registry robustly.
* Any exception raised is swallowed * Any exception raised is swallowed.
*
* @param verbose verbose path dump * @param verbose verbose path dump
* @return the registry tree * @return the registry tree
*/ */
@ -769,4 +812,79 @@ protected String dumpRegistryRobustly(boolean verbose) {
} }
return ""; return "";
} }
/**
* Registers a listener to path related events.
*
* @param listener the listener.
* @return a handle allowing for the management of the listener.
* @throws Exception if registration fails due to error.
*/
public ListenerHandle registerPathListener(final PathListener listener)
throws Exception {
final TreeCacheListener pathChildrenCacheListener =
new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework,
TreeCacheEvent event)
throws Exception {
String path = null;
if (event != null && event.getData() != null) {
path = event.getData().getPath();
}
assert event != null;
switch (event.getType()) {
case NODE_ADDED:
LOG.info("Informing listener of added node {}", path);
listener.nodeAdded(path);
break;
case NODE_REMOVED:
LOG.info("Informing listener of removed node {}", path);
listener.nodeRemoved(path);
break;
case NODE_UPDATED:
LOG.info("Informing listener of updated node {}", path);
listener.nodeAdded(path);
break;
default:
// do nothing
break;
}
}
};
treeCache.getListenable().addListener(pathChildrenCacheListener);
return new ListenerHandle() {
@Override
public void remove() {
treeCache.getListenable().removeListener(pathChildrenCacheListener);
}
};
}
// TODO: should caches be stopped and then restarted if need be?
/**
* Create the tree cache that monitors the registry for node addition, update,
* and deletion.
*
* @throws Exception if any issue arises during monitoring.
*/
public void monitorRegistryEntries()
throws Exception {
String registryPath =
getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
treeCache = new TreeCache(curator, registryPath);
treeCache.start();
}
} }

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.client.impl.zk;
/**
*
*/
public interface ListenerHandle {
void remove();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.client.impl.zk;
import java.io.IOException;
/**
*
*/
public interface PathListener {
void nodeAdded(String path) throws IOException;
void nodeRemoved(String path) throws IOException;
}

View File

@ -19,13 +19,23 @@
package org.apache.hadoop.registry.client.types.yarn; package org.apache.hadoop.registry.client.types.yarn;
/** /**
* YARN specific attributes in the registry * YARN specific attributes in the registry.
*/ */
public class YarnRegistryAttributes { public final class YarnRegistryAttributes {
/** /**
* ID. For containers: container ID. For application instances, application ID. * Hidden constructor.
*/
private YarnRegistryAttributes() {
}
/**
* ID. For containers: container ID. For application instances,
* application ID.
*/ */
public static final String YARN_ID = "yarn:id"; public static final String YARN_ID = "yarn:id";
public static final String YARN_PERSISTENCE = "yarn:persistence"; public static final String YARN_PERSISTENCE = "yarn:persistence";
public static final String YARN_PATH = "yarn:path";
public static final String YARN_HOSTNAME = "yarn:hostname";
public static final String YARN_IP = "yarn:ip";
} }

View File

@ -0,0 +1,353 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.xbill.DNS.Name;
import org.xbill.DNS.Type;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
/**
* A processor for generating application DNS records from registry service
* records.
*/
public class ApplicationServiceRecordProcessor extends
BaseServiceRecordProcessor {
/**
* Create an application service record processor.
*
* @param record the service record
* @param path the service record registry node path
* @param domain the DNS zone/domain name
* @param zoneSelector returns the zone associated with the provided name.
* @throws Exception if an issue is generated during instantiation.
*/
public ApplicationServiceRecordProcessor(
ServiceRecord record, String path, String domain,
ZoneSelector zoneSelector) throws Exception {
super(record, path, domain, zoneSelector);
}
/**
* Initializes the DNS record type to descriptor mapping based on the
* provided service record.
*
* @param serviceRecord the registry service record.
* @throws Exception if an issue is encountered.
*/
@Override public void initTypeToInfoMapping(ServiceRecord serviceRecord)
throws Exception {
for (int type : getRecordTypes()) {
switch (type) {
case Type.A:
createAInfo(serviceRecord);
break;
case Type.AAAA:
createAAAAInfo(serviceRecord);
break;
case Type.TXT:
createTXTInfo(serviceRecord);
break;
case Type.CNAME:
createCNAMEInfo(serviceRecord);
break;
case Type.SRV:
createSRVInfo(serviceRecord);
break;
default:
throw new IllegalArgumentException("Unknown type " + type);
}
}
}
/**
* Create an application TXT record descriptor.
*
* @param serviceRecord the service record.
* @throws Exception if there is an issue during descriptor creation.
*/
protected void createTXTInfo(ServiceRecord serviceRecord) throws Exception {
List<Endpoint> endpoints = serviceRecord.external;
List<RecordDescriptor> recordDescriptors = new ArrayList<>();
TXTApplicationRecordDescriptor txtInfo;
for (Endpoint endpoint : endpoints) {
txtInfo = new TXTApplicationRecordDescriptor(
serviceRecord, endpoint);
recordDescriptors.add(txtInfo);
}
registerRecordDescriptor(Type.TXT, recordDescriptors);
}
/**
* Create an application SRV record descriptor.
*
* @param serviceRecord the service record.
* @throws Exception if there is an issue during descriptor creation.
*/
protected void createSRVInfo(ServiceRecord serviceRecord) throws Exception {
List<Endpoint> endpoints = serviceRecord.external;
List<RecordDescriptor> recordDescriptors = new ArrayList<>();
SRVApplicationRecordDescriptor srvInfo;
for (Endpoint endpoint : endpoints) {
srvInfo = new SRVApplicationRecordDescriptor(
serviceRecord, endpoint);
recordDescriptors.add(srvInfo);
}
registerRecordDescriptor(Type.SRV, recordDescriptors);
}
/**
* Create an application CNAME record descriptor.
*
* @param serviceRecord the service record.
* @throws Exception if there is an issue during descriptor creation.
*/
protected void createCNAMEInfo(ServiceRecord serviceRecord) throws Exception {
List<Endpoint> endpoints = serviceRecord.external;
List<RecordDescriptor> recordDescriptors = new ArrayList<>();
CNAMEApplicationRecordDescriptor cnameInfo;
for (Endpoint endpoint : endpoints) {
cnameInfo = new CNAMEApplicationRecordDescriptor(
serviceRecord, endpoint);
recordDescriptors.add(cnameInfo);
}
registerRecordDescriptor(Type.CNAME, recordDescriptors);
}
/**
* Create an application AAAA record descriptor.
*
* @param record the service record.
* @throws Exception if there is an issue during descriptor creation.
*/
protected void createAAAAInfo(ServiceRecord record)
throws Exception {
AAAAApplicationRecordDescriptor
recordInfo = new AAAAApplicationRecordDescriptor(
getPath(), record);
registerRecordDescriptor(Type.AAAA, recordInfo);
}
/**
* Create an application A record descriptor.
*
* @param record the service record.
* @throws Exception if there is an issue during descriptor creation.
*/
protected void createAInfo(ServiceRecord record) throws Exception {
AApplicationRecordDescriptor recordInfo = new AApplicationRecordDescriptor(
getPath(), record);
registerRecordDescriptor(Type.A, recordInfo);
}
/**
* Returns the record types associated with a container service record.
*
* @return the record type array
*/
@Override public int[] getRecordTypes() {
return new int[] {Type.A, Type.AAAA, Type.CNAME, Type.SRV, Type.TXT};
}
/**
* An application TXT record descriptor.
*/
class TXTApplicationRecordDescriptor
extends ApplicationRecordDescriptor<List<String>> {
/**
* Creates an application TXT record descriptor.
*
* @param record service record
* @throws Exception
*/
public TXTApplicationRecordDescriptor(ServiceRecord record,
Endpoint endpoint) throws Exception {
super(record, endpoint);
}
/**
* Initializes the descriptor parameters.
*
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord)
throws Exception {
if (getEndpoint() != null) {
this.setNames(new Name[] {getServiceName(), getEndpointName()});
this.setTarget(getTextRecords(getEndpoint()));
}
}
}
/**
* An application SRV record descriptor.
*/
class SRVApplicationRecordDescriptor extends
ApplicationRecordDescriptor<RecordCreatorFactory.HostPortInfo> {
/**
* Creates an application SRV record descriptor.
*
* @param record service record
* @throws Exception
*/
public SRVApplicationRecordDescriptor(ServiceRecord record,
Endpoint endpoint) throws Exception {
super(record, endpoint);
}
/**
* Initializes the descriptor parameters.
*
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord)
throws Exception {
if (getEndpoint() != null) {
this.setNames(new Name[] {getServiceName(), getEndpointName()});
this.setTarget(new RecordCreatorFactory.HostPortInfo(
Name.fromString(getHost(getEndpoint()) + "."), getPort(
getEndpoint())));
}
}
}
/**
* An application CNAME record descriptor.
*/
class CNAMEApplicationRecordDescriptor extends
ApplicationRecordDescriptor<Name> {
/**
* Creates an application CNAME record descriptor.
*
* @param path registry path for service record
* @param record service record
* @throws Exception
*/
public CNAMEApplicationRecordDescriptor(String path,
ServiceRecord record) throws Exception {
super(record);
}
/**
* Creates an application CNAME record descriptor. This descriptor is the
* source for API related CNAME records.
*
* @param record service record
* @param endpoint the API endpoint
* @throws Exception
*/
public CNAMEApplicationRecordDescriptor(ServiceRecord record,
Endpoint endpoint) throws Exception {
super(record, endpoint);
}
/**
* Initializes the descriptor parameters.
*
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord)
throws Exception {
if (getEndpoint() != null) {
this.setNames(new Name[] {getEndpointName()});
this.setTarget(getServiceName());
}
}
}
/**
* An application A record descriptor.
*/
class AApplicationRecordDescriptor
extends ApplicationRecordDescriptor<InetAddress> {
/**
* Creates an application A record descriptor.
*
* @param path registry path for service record
* @param record service record
* @throws Exception
*/
public AApplicationRecordDescriptor(String path,
ServiceRecord record) throws Exception {
super(record);
}
/**
* Initializes the descriptor parameters.
*
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord)
throws Exception {
this.setNames(new Name[] {getServiceName()});
List<Endpoint> endpoints = serviceRecord.external;
// TODO: do we need a "hostname" attribute for an application record or
// can we rely on the first endpoint record.
this.setTarget(InetAddress.getByName(
getHost(endpoints.get(0))));
}
}
/**
* An application AAAA record descriptor.
*/
class AAAAApplicationRecordDescriptor extends AApplicationRecordDescriptor {
/**
* Creates an application AAAA record descriptor.
*
* @param path registry path for service record
* @param record service record
* @throws Exception
*/
public AAAAApplicationRecordDescriptor(String path,
ServiceRecord record) throws Exception {
super(path, record);
}
/**
* Initializes the descriptor parameters.
*
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord)
throws Exception {
super.init(serviceRecord);
try {
this.setTarget(getIpv6Address(getTarget()));
} catch (UnknownHostException e) {
throw new IllegalStateException(e);
}
}
}
}

View File

@ -0,0 +1,469 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.AddressTypes;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.xbill.DNS.Name;
import org.xbill.DNS.ReverseMap;
import org.xbill.DNS.TextParseException;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Provides common service record processing logic.
*/
public abstract class BaseServiceRecordProcessor
implements ServiceRecordProcessor {
private final ZoneSelector zoneSelctor;
private Map<Integer, List<RecordDescriptor>> typeToDescriptorMap =
new HashMap<>();
private String path;
private String domain;
private static final Pattern USER_NAME = Pattern.compile("/users/(\\w*)/?");
private static final String SLIDER_API_PREFIX =
"classpath:org.apache.slider.";
private static final String HTTP_API_TYPE = "http://";
/**
* Creates a service record processor.
*
* @param record the service record.
* @param path the node path for the record in the registry.
* @param domain the target DNS domain for the service record
* associated DNS records.
* @param zoneSelector A selector of the best zone for a given DNS name.
* @throws Exception if an issue is generated during instantiation.
*/
public BaseServiceRecordProcessor(ServiceRecord record, String path,
String domain, ZoneSelector zoneSelector)
throws Exception {
this.setPath(path);
this.domain = domain;
this.zoneSelctor = zoneSelector;
initTypeToInfoMapping(record);
}
/**
* Return the username found in the ZK path.
*
* @param recPath the ZK recPath.
* @return the user name.
*/
protected String getUsername(String recPath) {
String user = "anonymous";
Matcher matcher = USER_NAME.matcher(recPath);
if (matcher.find()) {
user = matcher.group(1);
}
return user;
}
/**
* Return the IPv6 mapped address for the provided IPv4 address. Utilized
* to create corresponding AAAA records.
*
* @param address the IPv4 address.
* @return the mapped IPv6 address.
* @throws UnknownHostException
*/
static InetAddress getIpv6Address(InetAddress address)
throws UnknownHostException {
String[] octets = address.getHostAddress().split("\\.");
byte[] octetBytes = new byte[4];
for (int i = 0; i < 4; ++i) {
octetBytes[i] = (byte) Integer.parseInt(octets[i]);
}
byte[] ipv4asIpV6addr = new byte[16];
ipv4asIpV6addr[10] = (byte) 0xff;
ipv4asIpV6addr[11] = (byte) 0xff;
ipv4asIpV6addr[12] = octetBytes[0];
ipv4asIpV6addr[13] = octetBytes[1];
ipv4asIpV6addr[14] = octetBytes[2];
ipv4asIpV6addr[15] = octetBytes[3];
return Inet6Address.getByAddress(null, ipv4asIpV6addr, 0);
}
/**
* Reverse the string representation of the input IP address.
*
* @param ip the string representation of the IP address.
* @return the reversed IP address.
* @throws UnknownHostException if the ip is unknown.
*/
protected Name reverseIP(String ip) throws UnknownHostException {
return ReverseMap.fromAddress(ip);
}
/**
* Manages the creation and registration of service record generated DNS
* records.
*
* @param command the DNS registration command object (e.g. add_record,
* remove record)
* @throws IOException if the creation or registration generates an issue.
*/
@SuppressWarnings({"unchecked"})
public void manageDNSRecords(RegistryDNS.RegistryCommand command)
throws IOException {
for (Map.Entry<Integer, List<RecordDescriptor>> entry :
typeToDescriptorMap.entrySet()) {
for (RecordDescriptor recordDescriptor : entry.getValue()) {
for (Name name : recordDescriptor.getNames()) {
RecordCreatorFactory.RecordCreator recordCreator =
RecordCreatorFactory.getRecordCreator(entry.getKey());
command.exec(zoneSelctor.findBestZone(name),
recordCreator.create(name, recordDescriptor.getTarget()));
}
}
}
}
/**
* Add the DNS record descriptor object to the record type to descriptor
* mapping.
*
* @param type the DNS record type.
* @param recordDescriptor the DNS record descriptor
*/
protected void registerRecordDescriptor(int type,
RecordDescriptor recordDescriptor) {
List<RecordDescriptor> infos = new ArrayList<>();
infos.add(recordDescriptor);
typeToDescriptorMap.put(type, infos);
}
/**
* Add the DNS record descriptor objects to the record type to descriptor
* mapping.
*
* @param type the DNS record type.
* @param recordDescriptors the DNS record descriptors
*/
protected void registerRecordDescriptor(int type,
List<RecordDescriptor> recordDescriptors) {
typeToDescriptorMap.put(type, recordDescriptors);
}
/**
* Return the path associated with the record.
* @return the path.
*/
protected String getPath() {
return path;
}
/**
* Set the path associated with the record.
* @param path the path.
*/
protected void setPath(String path) {
this.path = path;
}
/**
* A descriptor container the information to be populated into a DNS record.
*
* @param <T> the DNS record type/class.
*/
abstract class RecordDescriptor<T> {
private final ServiceRecord record;
private Name[] names;
private T target;
/**
* Creates a DNS record descriptor.
*
* @param record the associated service record.
*/
public RecordDescriptor(ServiceRecord record) {
this.record = record;
}
/**
* Returns the DNS names associated with the record type and information.
*
* @return the array of names.
*/
public Name[] getNames() {
return names;
}
/**
* Return the target object for the DNS record.
*
* @return the DNS record target.
*/
public T getTarget() {
return target;
}
/**
* Initializes the names and information for this DNS record descriptor.
*
* @param serviceRecord the service record.
* @throws Exception
*/
protected abstract void init(ServiceRecord serviceRecord) throws Exception;
/**
* Returns the service record.
* @return the service record.
*/
public ServiceRecord getRecord() {
return record;
}
/**
* Sets the names associated with the record type and information.
* @param names the names.
*/
public void setNames(Name[] names) {
this.names = names;
}
/**
* Sets the target object associated with the record.
* @param target the target.
*/
public void setTarget(T target) {
this.target = target;
}
}
/**
* A container-based DNS record descriptor.
*
* @param <T> the DNS record type/class.
*/
abstract class ContainerRecordDescriptor<T> extends RecordDescriptor<T> {
public ContainerRecordDescriptor(String path, ServiceRecord record)
throws Exception {
super(record);
init(record);
}
/**
* Returns the DNS name constructed from the YARN container ID.
*
* @return the container ID name.
* @throws TextParseException
*/
protected Name getContainerIDName() throws TextParseException {
String containerID = RegistryPathUtils.lastPathEntry(getPath());
containerID = containerID.replace("container", "ctr");
return Name.fromString(String.format("%s.%s", containerID, domain));
}
/**
* Returns the DNS name constructed from the container role/component name.
*
* @return the DNS naem.
* @throws PathNotFoundException
* @throws TextParseException
*/
protected Name getContainerName()
throws PathNotFoundException, TextParseException {
String service = RegistryPathUtils.lastPathEntry(
RegistryPathUtils.parentOf(RegistryPathUtils.parentOf(getPath())));
String description = getRecord().description.toLowerCase();
String user = getUsername(getPath());
return Name.fromString(MessageFormat.format("{0}.{1}.{2}.{3}",
description,
service,
user,
domain));
}
}
/**
* An application-based DNS record descriptor.
*
* @param <T> the DNS record type/class.
*/
abstract class ApplicationRecordDescriptor<T> extends RecordDescriptor<T> {
private Endpoint srEndpoint;
/**
* Creates an application associated DNS record descriptor.
*
* @param record the service record.
* @throws Exception
*/
public ApplicationRecordDescriptor(ServiceRecord record)
throws Exception {
this(record, null);
}
/**
* Creates an application associated DNS record descriptor. The endpoint
* is leverated to create an associated application API record.
*
* @param record the service record.
* @param endpoint an API endpoint.
* @throws Exception
*/
public ApplicationRecordDescriptor(ServiceRecord record,
Endpoint endpoint) throws Exception {
super(record);
this.setEndpoint(endpoint);
init(record);
}
/**
* Get the service's DNS name for registration.
*
* @return the service DNS name.
* @throws TextParseException
*/
protected Name getServiceName() throws TextParseException {
String user = getUsername(getPath());
String service =
String.format("%s.%s.%s",
RegistryPathUtils.lastPathEntry(getPath()),
user,
domain);
return Name.fromString(service);
}
/**
* Get the host from the provided endpoint record.
*
* @param endpoint the endpoint info.
* @return the host name.
*/
protected String getHost(Endpoint endpoint) {
String host = null;
// assume one address for now
Map<String, String> address = endpoint.addresses.get(0);
if (endpoint.addressType.equals(AddressTypes.ADDRESS_HOSTNAME_AND_PORT)) {
host = address.get(AddressTypes.ADDRESS_HOSTNAME_FIELD);
} else if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
URI uri = URI.create(address.get("uri"));
host = uri.getHost();
}
return host;
}
/**
* Get the post from the provided endpoint record.
*
* @param endpoint the endpoint info.
* @return the port.
*/
protected int getPort(Endpoint endpoint) {
int port = -1;
// assume one address for now
Map<String, String> address = endpoint.addresses.get(0);
if (endpoint.addressType.equals(AddressTypes.ADDRESS_HOSTNAME_AND_PORT)) {
port = Integer.parseInt(address.get(AddressTypes.ADDRESS_PORT_FIELD));
} else if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
URI uri = URI.create(address.get("uri"));
port = uri.getPort();
}
return port;
}
/**
* Get the list of strings that can be related in a TXT record for the given
* endpoint.
*
* @param endpoint the endpoint information.
* @return the list of strings relating endpoint info.
*/
protected List<String> getTextRecords(Endpoint endpoint) {
Map<String, String> address = endpoint.addresses.get(0);
List<String> txtRecs = new ArrayList<String>();
txtRecs.add("api=" + getDNSApiFragment(endpoint.api));
if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
URI uri = URI.create(address.get("uri"));
txtRecs.add("path=" + uri.getPath());
}
return txtRecs;
}
/**
* Get an API name that is compatible with DNS standards (and shortened).
*
* @param api the api indicator.
* @return the shortened and compatible api name.
*/
protected String getDNSApiFragment(String api) {
String dnsApi = null;
if (api.startsWith(SLIDER_API_PREFIX)) {
dnsApi = api.substring(SLIDER_API_PREFIX.length());
} else if (api.startsWith(HTTP_API_TYPE)) {
dnsApi = "http";
}
assert dnsApi != null;
dnsApi = dnsApi.replace('.', '-');
return dnsApi;
}
/**
* Return the DNS name associated with the API endpoint.
*
* @return the name.
* @throws TextParseException
*/
protected Name getEndpointName() throws TextParseException {
return Name.fromString(String.format("%s-api.%s",
getDNSApiFragment(
getEndpoint().api),
getServiceName()));
}
/**
* Returns the endpoint.
* @return the endpoint.
*/
public Endpoint getEndpoint() {
return srEndpoint;
}
/**
* Sets the endpoint.
* @param endpoint the endpoint.
*/
public void setEndpoint(
Endpoint endpoint) {
this.srEndpoint = endpoint;
}
}
}

View File

@ -0,0 +1,278 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.xbill.DNS.Name;
import org.xbill.DNS.TextParseException;
import org.xbill.DNS.Type;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
/**
* A processor for generating container DNS records from registry service
* records.
*/
public class ContainerServiceRecordProcessor extends
BaseServiceRecordProcessor {
/**
* Create a container service record processor.
* @param record the service record
* @param path the service record registry node path
* @param domain the DNS zone/domain name
* @param zoneSelector returns the zone associated with the provided name.
* @throws Exception if an issue is generated during instantiation.
*/
public ContainerServiceRecordProcessor(
ServiceRecord record, String path, String domain,
ZoneSelector zoneSelector) throws Exception {
super(record, path, domain, zoneSelector);
}
/**
* Initializes the DNS record type to descriptor mapping based on the
* provided service record.
* @param serviceRecord the registry service record.
* @throws Exception if an issue arises.
*/
@Override public void initTypeToInfoMapping(ServiceRecord serviceRecord)
throws Exception {
if (serviceRecord.get(YarnRegistryAttributes.YARN_IP) != null) {
for (int type : getRecordTypes()) {
switch (type) {
case Type.A:
createAInfo(serviceRecord);
break;
case Type.AAAA:
createAAAAInfo(serviceRecord);
break;
case Type.PTR:
createPTRInfo(serviceRecord);
break;
case Type.TXT:
createTXTInfo(serviceRecord);
break;
default:
throw new IllegalArgumentException("Unknown type " + type);
}
}
}
}
/**
* Create a container TXT record descriptor.
* @param serviceRecord the service record.
* @throws Exception if the descriptor creation yields an issue.
*/
protected void createTXTInfo(ServiceRecord serviceRecord) throws Exception {
TXTContainerRecordDescriptor txtInfo =
new TXTContainerRecordDescriptor(getPath(), serviceRecord);
registerRecordDescriptor(Type.TXT, txtInfo);
}
/**
* Creates a container PTR record descriptor.
* @param record the service record.
* @throws Exception if the descriptor creation yields an issue.
*/
protected void createPTRInfo(ServiceRecord record) throws Exception {
PTRContainerRecordDescriptor
ptrInfo = new PTRContainerRecordDescriptor(getPath(), record);
registerRecordDescriptor(Type.PTR, ptrInfo);
}
/**
* Creates a container AAAA (IPv6) record descriptor.
* @param record the service record
* @throws Exception if the descriptor creation yields an issue.
*/
protected void createAAAAInfo(ServiceRecord record)
throws Exception {
AAAAContainerRecordDescriptor
recordInfo = new AAAAContainerRecordDescriptor(
getPath(), record);
registerRecordDescriptor(Type.AAAA, recordInfo);
}
/**
* Creates a container A (IPv4) record descriptor.
* @param record service record.
* @throws Exception if the descriptor creation yields an issue.
*/
protected void createAInfo(ServiceRecord record) throws Exception {
AContainerRecordDescriptor recordInfo = new AContainerRecordDescriptor(
getPath(), record);
registerRecordDescriptor(Type.A, recordInfo);
}
/**
* Returns the record types associated with a container service record.
* @return the record type array
*/
@Override public int[] getRecordTypes() {
return new int[] {Type.A, Type.AAAA, Type.PTR, Type.TXT};
}
/**
* A container TXT record descriptor.
*/
class TXTContainerRecordDescriptor
extends ContainerRecordDescriptor<List<String>> {
/**
* Creates a container TXT record descriptor.
* @param path registry path for service record
* @param record service record
* @throws Exception
*/
public TXTContainerRecordDescriptor(String path,
ServiceRecord record) throws Exception {
super(path, record);
}
/**
* Initializes the descriptor parameters.
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord) {
try {
this.setNames(new Name[] {getContainerIDName()});
} catch (TextParseException e) {
// log
}
List<String> txts = new ArrayList<>();
txts.add("id=" + serviceRecord.get(YarnRegistryAttributes.YARN_ID));
this.setTarget(txts);
}
}
/**
* A container PTR record descriptor.
*/
class PTRContainerRecordDescriptor extends ContainerRecordDescriptor<Name> {
/**
* Creates a container PTR record descriptor.
* @param path registry path for service record
* @param record service record
* @throws Exception
*/
public PTRContainerRecordDescriptor(String path,
ServiceRecord record) throws Exception {
super(path, record);
}
/**
* Initializes the descriptor parameters.
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord) {
String host = serviceRecord.get(YarnRegistryAttributes.YARN_HOSTNAME);
String ip = serviceRecord.get(YarnRegistryAttributes.YARN_IP);
Name reverseLookupName = null;
if (host != null && ip != null) {
try {
reverseLookupName = reverseIP(ip);
} catch (UnknownHostException e) {
//LOG
}
}
this.setNames(new Name[] {reverseLookupName});
try {
this.setTarget(getContainerIDName());
} catch (TextParseException e) {
//LOG
}
}
}
/**
* A container A record descriptor.
*/
class AContainerRecordDescriptor
extends ContainerRecordDescriptor<InetAddress> {
/**
* Creates a container A record descriptor.
* @param path registry path for service record
* @param record service record
* @throws Exception
*/
public AContainerRecordDescriptor(String path,
ServiceRecord record) throws Exception {
super(path, record);
}
/**
* Initializes the descriptor parameters.
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord) {
String ip = serviceRecord.get(YarnRegistryAttributes.YARN_IP);
if (ip == null) {
throw new IllegalArgumentException("No IP specified");
}
try {
this.setTarget(InetAddress.getByName(ip));
this.setNames(new Name[] {getContainerName(), getContainerIDName()});
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
/**
* A container AAAA record descriptor.
*/
class AAAAContainerRecordDescriptor extends AContainerRecordDescriptor {
/**
* Creates a container AAAA record descriptor.
* @param path registry path for service record
* @param record service record
* @throws Exception
*/
public AAAAContainerRecordDescriptor(String path,
ServiceRecord record) throws Exception {
super(path, record);
}
/**
* Initializes the descriptor parameters.
* @param serviceRecord the service record.
*/
@Override protected void init(ServiceRecord serviceRecord) {
super.init(serviceRecord);
try {
this.setTarget(getIpv6Address(getTarget()));
} catch (UnknownHostException e) {
throw new IllegalStateException(e);
}
}
}
}

View File

@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.xbill.DNS.AAAARecord;
import org.xbill.DNS.ARecord;
import org.xbill.DNS.CNAMERecord;
import org.xbill.DNS.DClass;
import org.xbill.DNS.Name;
import org.xbill.DNS.PTRRecord;
import org.xbill.DNS.Record;
import org.xbill.DNS.SRVRecord;
import org.xbill.DNS.TXTRecord;
import java.net.InetAddress;
import java.util.List;
import static org.xbill.DNS.Type.*;
/**
* A factory for creating DNS records.
*/
public final class RecordCreatorFactory {
private static long ttl;
/**
* Private constructor.
*/
private RecordCreatorFactory() {
}
/**
* Returns the DNS record creator for the provided type.
*
* @param type the DNS record type.
* @return the record creator.
*/
static RecordCreator getRecordCreator(int type) {
switch (type) {
case A:
return new ARecordCreator();
case CNAME:
return new CNAMERecordCreator();
case TXT:
return new TXTRecordCreator();
case AAAA:
return new AAAARecordCreator();
case PTR:
return new PTRRecordCreator();
case SRV:
return new SRVRecordCreator();
default:
throw new IllegalArgumentException("No type " + type);
}
}
/**
* Set the TTL value for the records created by the factory.
*
* @param ttl the ttl value, in seconds.
*/
public static void setTtl(long ttl) {
RecordCreatorFactory.ttl = ttl;
}
/**
* A DNS Record creator.
*
* @param <R> the record type
* @param <T> the record's target type
*/
public interface RecordCreator<R extends Record, T> {
R create(Name name, T target);
}
/**
* An A Record creator.
*/
static class ARecordCreator implements RecordCreator<ARecord, InetAddress> {
/**
* Creates an A record creator.
*/
public ARecordCreator() {
}
/**
* Creates a DNS A record.
*
* @param name the record name.
* @param target the record target/value.
* @return an A record.
*/
@Override public ARecord create(Name name, InetAddress target) {
return new ARecord(name, DClass.IN, ttl, target);
}
}
/**
* An AAAA Record creator.
*/
static class AAAARecordCreator
implements RecordCreator<AAAARecord, InetAddress> {
/**
* Creates an AAAA record creator.
*/
public AAAARecordCreator() {
}
/**
* Creates a DNS AAAA record.
*
* @param name the record name.
* @param target the record target/value.
* @return an A record.
*/
@Override public AAAARecord create(Name name, InetAddress target) {
return new AAAARecord(name, DClass.IN, ttl, target);
}
}
static class CNAMERecordCreator implements RecordCreator<CNAMERecord, Name> {
/**
* Creates a CNAME record creator.
*/
public CNAMERecordCreator() {
}
/**
* Creates a DNS CNAME record.
*
* @param name the record name.
* @param target the record target/value.
* @return an A record.
*/
@Override public CNAMERecord create(Name name, Name target) {
return new CNAMERecord(name, DClass.IN, ttl, target);
}
}
/**
* A TXT Record creator.
*/
static class TXTRecordCreator
implements RecordCreator<TXTRecord, List<String>> {
/**
* Creates a TXT record creator.
*/
public TXTRecordCreator() {
}
/**
* Creates a DNS TXT record.
*
* @param name the record name.
* @param target the record target/value.
* @return an A record.
*/
@Override public TXTRecord create(Name name, List<String> target) {
return new TXTRecord(name, DClass.IN, ttl, target);
}
}
/**
* A PTR Record creator.
*/
static class PTRRecordCreator implements RecordCreator<PTRRecord, Name> {
/**
* Creates a PTR record creator.
*/
public PTRRecordCreator() {
}
/**
* Creates a DNS PTR record.
*
* @param name the record name.
* @param target the record target/value.
* @return an A record.
*/
@Override public PTRRecord create(Name name, Name target) {
return new PTRRecord(name, DClass.IN, ttl, target);
}
}
/**
* A SRV Record creator.
*/
static class SRVRecordCreator
implements RecordCreator<SRVRecord, HostPortInfo> {
/**
* Creates a SRV record creator.
*/
public SRVRecordCreator() {
}
/**
* Creates a DNS SRV record.
*
* @param name the record name.
* @param target the record target/value.
* @return an A record.
*/
@Override public SRVRecord create(Name name, HostPortInfo target) {
return new SRVRecord(name, DClass.IN, ttl, 1, 1, target.getPort(),
target.getHost());
}
}
/**
* An object for storing the host and port info used to generate SRV records.
*/
public static class HostPortInfo {
private Name host;
private int port;
/**
* Creates an object with a host and port pair.
*
* @param host the hostname/ip
* @param port the port value
*/
public HostPortInfo(Name host, int port) {
this.setHost(host);
this.setPort(port);
}
/**
* Return the host name.
* @return the host name.
*/
Name getHost() {
return host;
}
/**
* Set the host name.
* @param host the host name.
*/
void setHost(Name host) {
this.host = host;
}
/**
* Get the port.
* @return the port.
*/
int getPort() {
return port;
}
/**
* Set the port.
* @param port the port.
*/
void setPort(int port) {
this.port = port;
}
}
}

View File

@ -0,0 +1,290 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import com.google.common.base.Preconditions;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.client.api.DNSOperationsFactory;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.impl.zk.PathListener;
import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* A server/service that starts and manages the lifecycle of a DNS registry
* instance.
*/
public class RegistryDNSServer extends CompositeService {
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private RegistryDNS registryDNS;
private RegistryOperationsService registryOperations;
private static final Logger LOG =
LoggerFactory.getLogger(RegistryDNS.class);
private ConcurrentMap<String, ServiceRecord> pathToRecordMap;
/**
* Creates the DNS server.
* @param name the server name.
*/
public RegistryDNSServer(String name) {
super(name);
}
/**
* Initializes the DNS server.
* @param conf the hadoop configuration instance.
* @throws Exception if service initialization fails.
*/
@Override
protected void serviceInit(Configuration conf) throws Exception {
pathToRecordMap = new ConcurrentHashMap<>();
registryOperations = new RegistryOperationsService("RegistryDNSOperations");
addService(registryOperations);
// probably need to populate with existing apps?
registryDNS = (RegistryDNS) DNSOperationsFactory.createInstance(conf);
addService(registryDNS);
super.serviceInit(conf);
}
/**
* Starts the server.
* @throws Exception if service start fails.
*/
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
manageRegistryDNS();
}
/**
* Performs operations required to setup the DNS registry instance (e.g. sets
* up a path listener to react to service record creation/deletion and invoke
* the appropriate registry method).
*/
private void manageRegistryDNS() {
try {
registryOperations.monitorRegistryEntries();
registryOperations.registerPathListener(new PathListener() {
private String registryRoot = getConfig().
get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
@Override
public void nodeAdded(String path) throws IOException {
// get a listing of service records
String relativePath = getPathRelativeToRegistryRoot(path);
String child = RegistryPathUtils.lastPathEntry(path);
Map<String, RegistryPathStatus> map = new HashMap<>();
map.put(child, registryOperations.stat(relativePath));
Map<String, ServiceRecord> records =
RegistryUtils.extractServiceRecords(registryOperations,
getAdjustedParentPath(path),
map);
processServiceRecords(records, register);
pathToRecordMap.putAll(records);
}
private String getAdjustedParentPath(String path) {
Preconditions.checkNotNull(path);
String adjustedPath = null;
adjustedPath = getPathRelativeToRegistryRoot(path);
try {
return RegistryPathUtils.parentOf(adjustedPath);
} catch (PathNotFoundException e) {
// attempt to use passed in path
return path;
}
}
private String getPathRelativeToRegistryRoot(String path) {
String adjustedPath;
if (path.equals(registryRoot)) {
adjustedPath = "/";
} else {
adjustedPath = path.substring(registryRoot.length());
}
return adjustedPath;
}
@Override
public void nodeRemoved(String path) throws IOException {
ServiceRecord record = pathToRecordMap.remove(path.substring(
registryRoot.length()));
processServiceRecord(path, record, delete);
}
});
// create listener for record deletions
} catch (Exception e) {
LOG.warn("Unable to monitor the registry. DNS support disabled.", e);
}
}
/**
* A registry management command interface.
*/
interface ManagementCommand {
void exec(String path, ServiceRecord record) throws IOException;
}
/**
* Performs registry service record registration.
*/
private final ManagementCommand register = new ManagementCommand() {
@Override
public void exec(String path, ServiceRecord record) throws IOException {
if (record != null) {
LOG.info("Registering DNS records for {}", path);
registryDNS.register(path, record);
}
}
};
/**
* Performs registry service record deletion.
*/
private ManagementCommand delete = new ManagementCommand() {
@Override
public void exec(String path, ServiceRecord record) throws IOException {
if (record != null) {
LOG.info("Deleting DNS records for {}", path);
registryDNS.delete(path, record);
}
}
};
/**
* iterates thru the supplied service records, executing the provided registry
* command.
* @param records the service records.
* @param command the registry command.
* @throws IOException
*/
private void processServiceRecords(Map<String, ServiceRecord> records,
ManagementCommand command)
throws IOException {
for (Map.Entry<String, ServiceRecord> entry : records.entrySet()) {
processServiceRecord(entry.getKey(), entry.getValue(), command);
}
}
/**
* Process the service record, parsing the information and creating the
* required DNS records.
* @param path the service record path.
* @param record the record.
* @param command the registry command to execute.
* @throws IOException
*/
private void processServiceRecord(String path, ServiceRecord record,
ManagementCommand command)
throws IOException {
command.exec(path, record);
}
/**
* Launch the server.
* @param args command line args.
* @return
*/
static RegistryDNSServer launchDNSServer(String[] args) {
RegistryDNSServer dnsServer = null;
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(RegistryDNSServer.class, args,
LOG);
try {
dnsServer = new RegistryDNSServer("RegistryDNSServer");
ShutdownHookManager.get().addShutdownHook(
new CompositeService.CompositeServiceShutdownHook(dnsServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
processCommandLine(args, conf);
new GenericOptionsParser(conf, args);
dnsServer.init(conf);
dnsServer.start();
} catch (Throwable t) {
LOG.error("Error starting Registry DNS Server", t);
ExitUtil.terminate(-1, "Error starting Registry DNS Server");
}
return dnsServer;
}
/**
* Process input command line arguments.
* @param args the command line argument array.
* @param conf the configuration.
*/
private static void processCommandLine(String[] args,
YarnConfiguration conf) {
Options options = new Options();
options.addOption("p", "port", true,
"the server listening port (override)");
CommandLineParser parser = new BasicParser();
try {
CommandLine cmd = parser.parse(options, args);
if (cmd.hasOption("p")) {
conf.set(RegistryConstants.KEY_DNS_PORT, cmd.getOptionValue("p"));
}
} catch (ParseException e) {
LOG.error("Error parsing the command line options", e);
}
}
/**
* Lanches the server instance.
* @param args the command line args.
*/
public static void main(String[] args) {
launchDNSServer(args);
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.xbill.DNS.DClass;
import org.xbill.DNS.NXTRecord;
import org.xbill.DNS.Name;
import org.xbill.DNS.RRset;
import org.xbill.DNS.Record;
import org.xbill.DNS.SetResponse;
import org.xbill.DNS.Type;
import org.xbill.DNS.Zone;
import org.xbill.DNS.ZoneTransferException;
import org.xbill.DNS.ZoneTransferIn;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* A zone implementation geared to support some DNSSEC functionality.
*/
public class SecureableZone extends Zone {
private List<Record> records;
/**
* Creates a Zone by doing the specified zone transfer.
* @param xfrin The incoming zone transfer to execute.
* @throws IOException if there is an error.
* @throws ZoneTransferException if there is an error.
*/
public SecureableZone(ZoneTransferIn xfrin)
throws IOException, ZoneTransferException {
super(xfrin);
}
/**
* Creates a Zone by performing a zone transfer to the specified host.
* @param zone zone name.
* @param dclass the dclass
* @param remote the remote host.
* @throws IOException if there is an error.
* @throws ZoneTransferException if there is an error.
*/
public SecureableZone(Name zone, int dclass, String remote)
throws IOException, ZoneTransferException {
super(zone, dclass, remote);
}
/**
* Creates a Zone from the records in the specified master file.
* @param zone The name of the zone.
* @param file The master file to read from.
* @throws IOException if there is an error.
*/
public SecureableZone(Name zone, String file) throws IOException {
super(zone, file);
}
/**
* Creates a Zone from an array of records.
* @param zone The name of the zone.
* @param records The records to add to the zone.
* @throws IOException if there is an error.
*/
public SecureableZone(Name zone, Record[] records)
throws IOException {
super(zone, records);
}
/**
* Adds a Record to the Zone.
* @param r The record to be added
* @see Record
*/
@Override public void addRecord(Record r) {
if (records == null) {
records = new ArrayList<Record>();
}
super.addRecord(r);
records.add(r);
}
/**
* Removes a record from the Zone.
* @param r The record to be removed
* @see Record
*/
@Override public void removeRecord(Record r) {
if (records == null) {
records = new ArrayList<Record>();
}
super.removeRecord(r);
records.remove(r);
}
/**
* Return a NXT record appropriate for the query.
* @param queryRecord the query record.
* @param zone the zone to search.
* @return the NXT record describing the insertion point.
*/
@SuppressWarnings({"unchecked"})
public Record getNXTRecord(Record queryRecord, Zone zone) {
Collections.sort(records);
int index = Collections.binarySearch(records, queryRecord,
new Comparator<Record>() {
@Override public int compare(Record r1, Record r2) {
return r1.compareTo(r2);
}
});
if (index >= 0) {
return null;
}
index = -index - 1;
if (index >= records.size()) {
index = records.size() - 1;
}
Record base = records.get(index);
SetResponse sr = zone.findRecords(base.getName(), Type.ANY);
BitSet bitMap = new BitSet();
bitMap.set(Type.NXT);
RRset[] rRsets = sr.answers();
for (RRset rRset : rRsets) {
int typeCode = rRset.getType();
if (typeCode > 0 && typeCode < 128) {
bitMap.set(typeCode);
}
}
return new NXTRecord(base.getName(), DClass.IN, zone.getSOA().getMinimum(),
queryRecord.getName(), bitMap);
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import java.io.IOException;
/**
* Manage the processing of service records in order to create DNS records.
*/
public interface ServiceRecordProcessor {
/**
* Initialize the mapping between DNS record type and record information
* for the given service record.
* @param serviceRecord the registry service record.
* @throws Exception if encountering an error.
*/
void initTypeToInfoMapping(ServiceRecord serviceRecord)
throws Exception;
/**
* Return the DNS record types valid for this processor.
* @return the array of DNS record types.
*/
int[] getRecordTypes();
/**
* Manage the creation and registration of DNS records generated by parsing
* a service record.
* @param command the DNS registration command object (e.g. add_record,
* remove record)
* @throws IOException if the creation or registration generates an issue.
*/
void manageDNSRecords(RegistryDNS.RegistryCommand command)
throws IOException;
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.xbill.DNS.Name;
import org.xbill.DNS.Zone;
/**
* A selector that returns the zone associated with a provided name.
*/
public interface ZoneSelector {
/**
* Finds the best matching zone given the provided name.
* @param name the record name for which a zone is requested.
* @return the matching zone.
*/
Zone findBestZone(Name name);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* DNS Server classes.
* <p>
* These classes are leveraged to create a DNS server that can provide the
* facilities necessary for YARN application and/or service discovery.
* </p>
*/
package org.apache.hadoop.registry.server.dns;

View File

@ -0,0 +1,561 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.apache.commons.net.util.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.xbill.DNS.AAAARecord;
import org.xbill.DNS.ARecord;
import org.xbill.DNS.CNAMERecord;
import org.xbill.DNS.DClass;
import org.xbill.DNS.DNSKEYRecord;
import org.xbill.DNS.DNSSEC;
import org.xbill.DNS.Flags;
import org.xbill.DNS.Message;
import org.xbill.DNS.Name;
import org.xbill.DNS.OPTRecord;
import org.xbill.DNS.PTRRecord;
import org.xbill.DNS.RRSIGRecord;
import org.xbill.DNS.RRset;
import org.xbill.DNS.Rcode;
import org.xbill.DNS.Record;
import org.xbill.DNS.SRVRecord;
import org.xbill.DNS.Section;
import org.xbill.DNS.Type;
import java.io.IOException;
import java.math.BigInteger;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.security.KeyFactory;
import java.security.PrivateKey;
import java.security.spec.RSAPrivateKeySpec;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_ZONE_MASK;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_ZONE_SUBNET;
/**
*
*/
public class TestRegistryDNS extends Assert {
private RegistryDNS registryDNS;
private RegistryUtils.ServiceRecordMarshal marshal;
private static final String APPLICATION_RECORD = "{\n"
+ " \"type\" : \"JSONServiceRecord\",\n"
+ " \"description\" : \"Slider Application Master\",\n"
+ " \"external\" : [ {\n"
+ " \"api\" : \"classpath:org.apache.slider.appmaster.ipc\",\n"
+ " \"addressType\" : \"host/port\",\n"
+ " \"protocolType\" : \"hadoop/IPC\",\n"
+ " \"addresses\" : [ {\n"
+ " \"host\" : \"192.168.1.5\",\n"
+ " \"port\" : \"1026\"\n"
+ " } ]\n"
+ " }, {\n"
+ " \"api\" : \"http://\",\n"
+ " \"addressType\" : \"uri\",\n"
+ " \"protocolType\" : \"webui\",\n"
+ " \"addresses\" : [ {\n"
+ " \"uri\" : \"http://192.168.1.5:1027\"\n"
+ " } ]\n"
+ " }, {\n"
+ " \"api\" : \"classpath:org.apache.slider.management\",\n"
+ " \"addressType\" : \"uri\",\n"
+ " \"protocolType\" : \"REST\",\n"
+ " \"addresses\" : [ {\n"
+ " \"uri\" : \"http://192.168.1.5:1027/ws/v1/slider/mgmt\"\n"
+ " } ]\n"
+ " } ],\n"
+ " \"internal\" : [ {\n"
+ " \"api\" : \"classpath:org.apache.slider.agents.secure\",\n"
+ " \"addressType\" : \"uri\",\n"
+ " \"protocolType\" : \"REST\",\n"
+ " \"addresses\" : [ {\n"
+ " \"uri\" : \"https://192.168.1.5:47700/ws/v1/slider/agents\"\n"
+ " } ]\n"
+ " }, {\n"
+ " \"api\" : \"classpath:org.apache.slider.agents.oneway\",\n"
+ " \"addressType\" : \"uri\",\n"
+ " \"protocolType\" : \"REST\",\n"
+ " \"addresses\" : [ {\n"
+ " \"uri\" : \"https://192.168.1.5:35531/ws/v1/slider/agents\"\n"
+ " } ]\n"
+ " } ],\n"
+ " \"yarn:id\" : \"application_1451931954322_0016\",\n"
+ " \"yarn:persistence\" : \"application\"\n"
+ "}\n";
static final String CONTAINER_RECORD = "{\n"
+ " \"type\" : \"JSONServiceRecord\",\n"
+ " \"description\" : \"YCLOUD\",\n"
+ " \"external\" : [ ],\n"
+ " \"internal\" : [ ],\n"
+ " \"yarn:id\" : \"container_e50_1451931954322_0016_01_000002\",\n"
+ " \"yarn:persistence\" : \"container\",\n"
+ " \"yarn:ip\" : \"172.17.0.19\",\n"
+ " \"yarn:hostname\" : \"0a134d6329ba\"\n"
+ "}\n";
private static final String CONTAINER_RECORD_NO_IP = "{\n"
+ " \"type\" : \"JSONServiceRecord\",\n"
+ " \"description\" : \"YCLOUD\",\n"
+ " \"external\" : [ ],\n"
+ " \"internal\" : [ ],\n"
+ " \"yarn:id\" : \"container_e50_1451931954322_0016_01_000002\",\n"
+ " \"yarn:persistence\" : \"container\"\n"
+ "}\n";
@Before
public void initialize() throws Exception {
setRegistryDNS(new RegistryDNS("TestRegistry"));
Configuration conf = createConfiguration();
getRegistryDNS().setDomainName(conf);
getRegistryDNS().initializeZones(conf);
setMarshal(new RegistryUtils.ServiceRecordMarshal());
}
protected Configuration createConfiguration() {
Configuration conf = new Configuration();
conf.set(RegistryConstants.KEY_DNS_DOMAIN, "hwx.test");
conf.set(RegistryConstants.KEY_DNS_ZONE_SUBNET, "172.17.0");
conf.setTimeDuration(RegistryConstants.KEY_DNS_TTL, 30L, TimeUnit.SECONDS);
return conf;
}
protected boolean isSecure() {
return false;
}
@After
public void closeRegistry() throws Exception {
getRegistryDNS().stopExecutor();
}
@Test
public void testAppRegistration() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
APPLICATION_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/", record);
// start assessing whether correct records are available
Record[] recs = assertDNSQuery("test1.root.hwx.test.");
assertEquals("wrong result", "192.168.1.5",
((ARecord) recs[0]).getAddress().getHostAddress());
recs = assertDNSQuery("management-api.test1.root.hwx.test.", 2);
assertEquals("wrong target name", "test1.root.hwx.test.",
((CNAMERecord) recs[0]).getTarget().toString());
assertTrue("not an ARecord", recs[isSecure() ? 2 : 1] instanceof ARecord);
recs = assertDNSQuery("appmaster-ipc-api.test1.root.hwx.test.",
Type.SRV, 1);
assertTrue("not an SRV record", recs[0] instanceof SRVRecord);
assertEquals("wrong port", 1026, ((SRVRecord) recs[0]).getPort());
recs = assertDNSQuery("appmaster-ipc-api.test1.root.hwx.test.", 2);
assertEquals("wrong target name", "test1.root.hwx.test.",
((CNAMERecord) recs[0]).getTarget().toString());
assertTrue("not an ARecord", recs[isSecure() ? 2 : 1] instanceof ARecord);
recs = assertDNSQuery("http-api.test1.root.hwx.test.", 2);
assertEquals("wrong target name", "test1.root.hwx.test.",
((CNAMERecord) recs[0]).getTarget().toString());
assertTrue("not an ARecord", recs[isSecure() ? 2 : 1] instanceof ARecord);
recs = assertDNSQuery("http-api.test1.root.hwx.test.", Type.SRV,
1);
assertTrue("not an SRV record", recs[0] instanceof SRVRecord);
assertEquals("wrong port", 1027, ((SRVRecord) recs[0]).getPort());
assertDNSQuery("test1.root.hwx.test.", Type.TXT, 3);
assertDNSQuery("appmaster-ipc-api.test1.root.hwx.test.", Type.TXT, 1);
assertDNSQuery("http-api.test1.root.hwx.test.", Type.TXT, 1);
assertDNSQuery("management-api.test1.root.hwx.test.", Type.TXT, 1);
}
@Test
public void testContainerRegistration() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Record[] recs =
assertDNSQuery("ctr-e50-1451931954322-0016-01-000002.hwx.test.");
assertEquals("wrong result", "172.17.0.19",
((ARecord) recs[0]).getAddress().getHostAddress());
recs = assertDNSQuery("ycloud.test1.root.hwx.test.", 1);
assertTrue("not an ARecord", recs[0] instanceof ARecord);
}
@Test
public void testRecordTTL() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Record[] recs = assertDNSQuery(
"ctr-e50-1451931954322-0016-01-000002.hwx.test.");
assertEquals("wrong result", "172.17.0.19",
((ARecord) recs[0]).getAddress().getHostAddress());
assertEquals("wrong ttl", 30L, recs[0].getTTL());
recs = assertDNSQuery("ycloud.test1.root.hwx.test.", 1);
assertTrue("not an ARecord", recs[0] instanceof ARecord);
assertEquals("wrong ttl", 30L, recs[0].getTTL());
}
@Test
public void testReverseLookup() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Record[] recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
assertEquals("wrong result",
"ctr-e50-1451931954322-0016-01-000002.hwx.test.",
((PTRRecord) recs[0]).getTarget().toString());
}
@Test
public void testReverseLookupInLargeNetwork() throws Exception {
setRegistryDNS(new RegistryDNS("TestRegistry"));
Configuration conf = createConfiguration();
conf.set(RegistryConstants.KEY_DNS_DOMAIN, "hwx.test");
conf.set(KEY_DNS_ZONE_SUBNET, "172.17.0.0");
conf.set(KEY_DNS_ZONE_MASK, "255.255.224.0");
conf.setTimeDuration(RegistryConstants.KEY_DNS_TTL, 30L, TimeUnit.SECONDS);
getRegistryDNS().setDomainName(conf);
getRegistryDNS().initializeZones(conf);
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Record[] recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
assertEquals("wrong result",
"ctr-e50-1451931954322-0016-01-000002.hwx.test.",
((PTRRecord) recs[0]).getTarget().toString());
}
@Test
public void testMissingReverseLookup() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Name name = Name.fromString("19.1.17.172.in-addr.arpa.");
Record question = Record.newRecord(name, Type.PTR, DClass.IN);
Message query = Message.newQuery(question);
OPTRecord optRecord = new OPTRecord(4096, 0, 0, Flags.DO, null);
query.addRecord(optRecord, Section.ADDITIONAL);
byte[] responseBytes = getRegistryDNS().generateReply(query, null);
Message response = new Message(responseBytes);
assertEquals("No answer should be returned", Rcode.NOTAUTH,
response.getRcode());
}
@Test
public void testNoContainerIP() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD_NO_IP.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Name name =
Name.fromString("ctr-e50-1451931954322-0016-01-000002.hwx.test.");
Record question = Record.newRecord(name, Type.A, DClass.IN);
Message query = Message.newQuery(question);
byte[] responseBytes = getRegistryDNS().generateReply(query, null);
Message response = new Message(responseBytes);
assertEquals("wrong status", Rcode.NXDOMAIN, response.getRcode());
}
private Record[] assertDNSQuery(String lookup) throws IOException {
return assertDNSQuery(lookup, Type.A, 1);
}
private Record[] assertDNSQuery(String lookup, int numRecs)
throws IOException {
return assertDNSQuery(lookup, Type.A, numRecs);
}
Record[] assertDNSQuery(String lookup, int type, int numRecs)
throws IOException {
Name name = Name.fromString(lookup);
Record question = Record.newRecord(name, type, DClass.IN);
Message query = Message.newQuery(question);
OPTRecord optRecord = new OPTRecord(4096, 0, 0, Flags.DO, null);
query.addRecord(optRecord, Section.ADDITIONAL);
byte[] responseBytes = getRegistryDNS().generateReply(query, null);
Message response = new Message(responseBytes);
assertEquals("not successful", Rcode.NOERROR, response.getRcode());
assertNotNull("Null response", response);
assertEquals("Questions do not match", query.getQuestion(),
response.getQuestion());
Record[] recs = response.getSectionArray(Section.ANSWER);
assertEquals("wrong number of answer records",
isSecure() ? numRecs * 2 : numRecs, recs.length);
if (isSecure()) {
boolean signed = false;
for (Record record : recs) {
signed = record.getType() == Type.RRSIG;
if (signed) {
break;
}
}
assertTrue("No signatures found", signed);
}
return recs;
}
@Test
public void testDNSKEYRecord() throws Exception {
String publicK =
"AwEAAe1Jev0Az1khlQCvf0nud1/CNHQwwPEu8BNchZthdDxKPVn29yrD "
+ "CHoAWjwiGsOSw3SzIPrawSbHzyJsjn0oLBhGrH6QedFGnydoxjNsw3m/ "
+ "SCmOjR/a7LGBAMDFKqFioi4gOyuN66svBeY+/5uw72+0ei9AQ20gqf6q "
+ "l9Ozs5bV";
// byte[] publicBytes = Base64.decodeBase64(publicK);
// X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicBytes);
// KeyFactory keyFactory = KeyFactory.getInstance("RSA");
// PublicKey pubKey = keyFactory.generatePublic(keySpec);
DNSKEYRecord dnskeyRecord =
new DNSKEYRecord(Name.fromString("hwxstg.site."), DClass.IN, 0,
DNSKEYRecord.Flags.ZONE_KEY,
DNSKEYRecord.Protocol.DNSSEC,
DNSSEC.Algorithm.RSASHA256,
Base64.decodeBase64(publicK.getBytes()));
assertNotNull(dnskeyRecord);
RSAPrivateKeySpec privateSpec = new RSAPrivateKeySpec(new BigInteger(1,
Base64.decodeBase64(
"7Ul6/QDPWSGVAK9/Se53X8I0dDDA8S7wE1yFm2F0PEo9Wfb3KsMIegBaPCIaw5LDd"
+ "LMg+trBJsfPImyOfSgsGEasfpB50UafJ2jGM2zDeb9IKY6NH9rssYEAwMUq"
+ "oWKiLiA7K43rqy8F5j7/m7Dvb7R6L0BDbSCp/qqX07OzltU=")),
new BigInteger(1, Base64.decodeBase64(
"MgbQ6DBYhskeufNGGdct0cGG/4wb0X183ggenwCv2dopDyOTPq+5xMb4Pz9Ndzgk/"
+ "yCY7mpaWIu9rttGOzrR+LBRR30VobPpMK1bMnzu2C0x08oYAguVwZB79DLC"
+ "705qmZpiaaFB+LnhG7VtpPiOBm3UzZxdrBfeq/qaKrXid60=")));
KeyFactory factory = KeyFactory.getInstance("RSA");
PrivateKey priv = factory.generatePrivate(privateSpec);
ARecord aRecord = new ARecord(Name.fromString("some.test."), DClass.IN, 0,
InetAddress.getByName("192.168.0.1"));
Calendar cal = Calendar.getInstance();
Date inception = cal.getTime();
cal.add(Calendar.YEAR, 1);
Date expiration = cal.getTime();
RRset rrset = new RRset(aRecord);
RRSIGRecord rrsigRecord = DNSSEC.sign(rrset,
dnskeyRecord,
priv,
inception,
expiration);
DNSSEC.verify(rrset, rrsigRecord, dnskeyRecord);
}
@Test
public void testIpv4toIpv6() throws Exception {
InetAddress address =
BaseServiceRecordProcessor
.getIpv6Address(InetAddress.getByName("172.17.0.19"));
assertTrue("not an ipv6 address", address instanceof Inet6Address);
assertEquals("wrong IP", "172.17.0.19",
InetAddress.getByAddress(address.getAddress()).getHostAddress());
}
@Test
public void testAAAALookup() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Record[] recs = assertDNSQuery(
"ctr-e50-1451931954322-0016-01-000002.hwx.test.", Type.AAAA, 1);
assertEquals("wrong result", "172.17.0.19",
((AAAARecord) recs[0]).getAddress().getHostAddress());
recs = assertDNSQuery("ycloud.test1.root.hwx.test.", Type.AAAA, 1);
assertTrue("not an ARecord", recs[0] instanceof AAAARecord);
}
@Test
public void testNegativeLookup() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Name name = Name.fromString("missing.hwx.test.");
Record question = Record.newRecord(name, Type.A, DClass.IN);
Message query = Message.newQuery(question);
byte[] responseBytes = getRegistryDNS().generateReply(query, null);
Message response = new Message(responseBytes);
assertEquals("not successful", Rcode.NXDOMAIN, response.getRcode());
assertNotNull("Null response", response);
assertEquals("Questions do not match", query.getQuestion(),
response.getQuestion());
Record[] sectionArray = response.getSectionArray(Section.AUTHORITY);
assertEquals("Wrong number of recs in AUTHORITY", isSecure() ? 2 : 1,
sectionArray.length);
boolean soaFound = false;
for (Record rec : sectionArray) {
soaFound = rec.getType() == Type.SOA;
if (soaFound) {
break;
}
}
assertTrue("wrong record type",
soaFound);
}
@Test
public void testReadMasterFile() throws Exception {
setRegistryDNS(new RegistryDNS("TestRegistry"));
Configuration conf = new Configuration();
conf.set(RegistryConstants.KEY_DNS_DOMAIN, "hwx.test");
conf.set(RegistryConstants.KEY_DNS_ZONE_SUBNET, "172.17.0");
conf.setTimeDuration(RegistryConstants.KEY_DNS_TTL, 30L, TimeUnit.SECONDS);
conf.set(RegistryConstants.KEY_DNS_ZONES_DIR,
getClass().getResource("/").getFile());
if (isSecure()) {
conf.setBoolean(RegistryConstants.KEY_DNSSEC_ENABLED, true);
conf.set(RegistryConstants.KEY_DNSSEC_PUBLIC_KEY,
"AwEAAe1Jev0Az1khlQCvf0nud1/CNHQwwPEu8BNchZthdDxKPVn29yrD "
+ "CHoAWjwiGsOSw3SzIPrawSbHzyJsjn0oLBhGrH6QedFGnydoxjNsw3m/ "
+ "SCmOjR/a7LGBAMDFKqFioi4gOyuN66svBeY+/5uw72+0ei9AQ20gqf6q "
+ "l9Ozs5bV");
conf.set(RegistryConstants.KEY_DNSSEC_PRIVATE_KEY_FILE,
getClass().getResource("/test.private").getFile());
}
getRegistryDNS().setDomainName(conf);
getRegistryDNS().initializeZones(conf);
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "container-e50-1451931954322-0016-01-000002",
record);
// start assessing whether correct records are available
Record[] recs =
assertDNSQuery("ctr-e50-1451931954322-0016-01-000002.hwx.test.");
assertEquals("wrong result", "172.17.0.19",
((ARecord) recs[0]).getAddress().getHostAddress());
recs = assertDNSQuery("ycloud.test1.root.hwx.test.", 1);
assertTrue("not an ARecord", recs[0] instanceof ARecord);
// lookup dyanmic reverse records
recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
assertEquals("wrong result",
"ctr-e50-1451931954322-0016-01-000002.hwx.test.",
((PTRRecord) recs[0]).getTarget().toString());
// now lookup static reverse records
Name name = Name.fromString("5.0.17.172.in-addr.arpa.");
Record question = Record.newRecord(name, Type.PTR, DClass.IN);
Message query = Message.newQuery(question);
OPTRecord optRecord = new OPTRecord(4096, 0, 0, Flags.DO, null);
query.addRecord(optRecord, Section.ADDITIONAL);
byte[] responseBytes = getRegistryDNS().generateReply(query, null);
Message response = new Message(responseBytes);
recs = response.getSectionArray(Section.ANSWER);
assertEquals("wrong result", "cn005.hwx.test.",
((PTRRecord) recs[0]).getTarget().toString());
}
@Test
public void testReverseZoneNames() throws Exception {
Configuration conf = new Configuration();
conf.set(KEY_DNS_ZONE_SUBNET, "172.26.32.0");
conf.set(KEY_DNS_ZONE_MASK, "255.255.224.0");
Name name = getRegistryDNS().getReverseZoneName(conf);
assertEquals("wrong name", "26.172.in-addr.arpa.", name.toString());
}
public RegistryDNS getRegistryDNS() {
return registryDNS;
}
public void setRegistryDNS(
RegistryDNS registryDNS) {
this.registryDNS = registryDNS;
}
public RegistryUtils.ServiceRecordMarshal getMarshal() {
return marshal;
}
public void setMarshal(
RegistryUtils.ServiceRecordMarshal marshal) {
this.marshal = marshal;
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.registry.client.api.RegistryConstants;
/**
*
*/
public class TestSecureRegistryDNS extends TestRegistryDNS {
@Override protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(RegistryConstants.KEY_DNSSEC_ENABLED, true);
conf.set(RegistryConstants.KEY_DNSSEC_PUBLIC_KEY,
"AwEAAe1Jev0Az1khlQCvf0nud1/CNHQwwPEu8BNchZthdDxKPVn29yrD "
+ "CHoAWjwiGsOSw3SzIPrawSbHzyJsjn0oLBhGrH6QedFGnydoxjNsw3m/ "
+ "SCmOjR/a7LGBAMDFKqFioi4gOyuN66svBeY+/5uw72+0ei9AQ20gqf6q "
+ "l9Ozs5bV");
conf.set(RegistryConstants.KEY_DNSSEC_PRIVATE_KEY_FILE,
getClass().getResource("/test.private").getFile());
return conf;
}
@Override protected boolean isSecure() {
return true;
}
}

View File

@ -0,0 +1,36 @@
;
; Licensed to the Apache Software Foundation (ASF) under one
; or more contributor license agreements. See the NOTICE file
; distributed with this work for additional information
; regarding copyright ownership. The ASF licenses this file
; to you under the Apache License, Version 2.0 (the
; "License"); you may not use this file except in compliance
; with the License. You may obtain a copy of the License at
;
; http://www.apache.org/licenses/LICENSE-2.0
;
; Unless required by applicable law or agreed to in writing, software
; distributed under the License is distributed on an "AS IS" BASIS,
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
; See the License for the specific language governing permissions and
; limitations under the License.
;
;
$ORIGIN .
$TTL 1800 ; 30 minutes
0.17.172.in-addr.arpa IN SOA ns.hwhq.hortonworks.com. it.hortonworks.com. (
2015081000 ; serial
10800 ; refresh (3 hours)
900 ; retry (15 minutes)
1814400 ; expire (3 weeks)
10800 ; minimum (3 hours)
)
NS ns.hwhq.hortonworks.com.
NS ns2.hwhq.hortonworks.com.
$ORIGIN 0.17.172.in-addr.arpa.
5 PTR cn005.hwx.test.
6 PTR cn006.hwx.test.
7 PTR cn007.hwx.test.
8 PTR cn008.hwx.test.
9 PTR cn009.hwx.test.

View File

@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Private-key-format: v1.3
Algorithm: 8 (RSASHA256)
Modulus: 7Ul6/QDPWSGVAK9/Se53X8I0dDDA8S7wE1yFm2F0PEo9Wfb3KsMIegBaPCIaw5LDdLMg+trBJsfPImyOfSgsGEasfpB50UafJ2jGM2zDeb9IKY6NH9rssYEAwMUqoWKiLiA7K43rqy8F5j7/m7Dvb7R6L0BDbSCp/qqX07OzltU=
PublicExponent: AQAB
PrivateExponent: MgbQ6DBYhskeufNGGdct0cGG/4wb0X183ggenwCv2dopDyOTPq+5xMb4Pz9Ndzgk/yCY7mpaWIu9rttGOzrR+LBRR30VobPpMK1bMnzu2C0x08oYAguVwZB79DLC705qmZpiaaFB+LnhG7VtpPiOBm3UzZxdrBfeq/qaKrXid60=
Prime1: /HFdjI4cRuJBjK9IGWWmmVZWwaFsQYO9GHLCDwjm691GxaDpXuMdPd0uH9EqQvskyF8JPmzQXI43swyUFjizow==
Prime2: 8KFxkWEHlhgB2GLi8tk39TKY5vmFUvh4FO28COl1N/rWjKVpfM1p6HQ6YavoGNZQmDBazv4WOZRqSQukHApzJw==
Exponent1: alX+h/RcqOcpoW88OaZ99N1PkiTDCx3JC4FbiSXAz93Xr+vGIfgdGzAN+80JtklABz8xD6CabEJj6AIGZw3fbQ==
Exponent2: vvPusqZkJcjBVh0K6hpUXKEdU1W5ZmFEsZ8Cs7PH0Hee4Je3QVGk9NGfLrkDgwo3hL4CofZiXqkXOwYg4husyw==
Coefficient: omxpbNU6u/swbnkTC6MicaDqbJP7ETnCCJ1iN2+HZO/AlQCFlqVzLwGZmvGMAGA9ZWF+YpqpPhvzi4bWmi5XrQ==
Created: 20160119155251
Publish: 20160119155251
Activate: 20160119155251