YARN-2768 Improved Yarn Registry service record structure (stevel)
This commit is contained in:
parent
f5b19bed7d
commit
1670578018
@ -740,6 +740,8 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2677 registry punycoding of usernames doesn't fix all usernames to be
|
||||
DNS-valid (stevel)
|
||||
|
||||
YARN-2768 Improved Yarn Registry service record structure (stevel)
|
||||
|
||||
---
|
||||
|
||||
YARN-2598 GHS should show N/A instead of null for the inaccessible information
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
@ -174,24 +175,22 @@ public int resolve(String [] args) {
|
||||
ServiceRecord record = registry.resolve(argsList.get(1));
|
||||
|
||||
for (Endpoint endpoint : record.external) {
|
||||
if ((endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_WEBUI))
|
||||
|| (endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_REST))) {
|
||||
sysout.print(" Endpoint(ProtocolType="
|
||||
+ endpoint.protocolType + ", Api="
|
||||
+ endpoint.api + "); Uris are: ");
|
||||
} else {
|
||||
sysout.print(" Endpoint(ProtocolType="
|
||||
sysout.println(" Endpoint(ProtocolType="
|
||||
+ endpoint.protocolType + ", Api="
|
||||
+ endpoint.api + ");"
|
||||
+ " Addresses(AddressType="
|
||||
+ endpoint.addressType + ") are: ");
|
||||
|
||||
}
|
||||
for (List<String> a : endpoint.addresses) {
|
||||
sysout.print(a + " ");
|
||||
}
|
||||
sysout.println();
|
||||
}
|
||||
for (Map<String, String> address : endpoint.addresses) {
|
||||
sysout.println(" [ ");
|
||||
for (Map.Entry<String, String> entry : address.entrySet()) {
|
||||
sysout.println(" " + entry.getKey()
|
||||
+ ": \"" + entry.getValue() + "\"");
|
||||
}
|
||||
sysout.println(" ]");
|
||||
}
|
||||
sysout.println();
|
||||
}
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
syserr.println(analyzeException("resolve", e, argsList));
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.registry.client.binding;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
@ -45,8 +46,6 @@
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Support for marshalling objects to and from JSON.
|
||||
@ -62,30 +61,30 @@ public class JsonSerDeser<T> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class);
|
||||
private static final String UTF_8 = "UTF-8";
|
||||
public static final String E_NO_SERVICE_RECORD = "No service record at path";
|
||||
public static final String E_NO_DATA = "No data at path";
|
||||
public static final String E_DATA_TOO_SHORT = "Data at path too short";
|
||||
public static final String E_MISSING_MARKER_STRING =
|
||||
"Missing marker string: ";
|
||||
|
||||
private final Class<T> classType;
|
||||
private final ObjectMapper mapper;
|
||||
private final byte[] header;
|
||||
|
||||
/**
|
||||
* Create an instance bound to a specific type
|
||||
* @param classType class to marshall
|
||||
* @param header byte array to use as header
|
||||
*/
|
||||
public JsonSerDeser(Class<T> classType, byte[] header) {
|
||||
public JsonSerDeser(Class<T> classType) {
|
||||
Preconditions.checkArgument(classType != null, "null classType");
|
||||
Preconditions.checkArgument(header != null, "null header");
|
||||
this.classType = classType;
|
||||
this.mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
|
||||
false);
|
||||
// make an immutable copy to keep findbugs happy.
|
||||
byte[] h = new byte[header.length];
|
||||
System.arraycopy(header, 0, h, 0, header.length);
|
||||
this.header = h;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the simple name of the class type to be marshalled
|
||||
* @return the name of the class being marshalled
|
||||
*/
|
||||
public String getName() {
|
||||
return classType.getSimpleName();
|
||||
}
|
||||
@ -183,7 +182,7 @@ public T load(FileSystem fs, Path path)
|
||||
if (count != len) {
|
||||
throw new EOFException(path.toString() + ": read finished prematurely");
|
||||
}
|
||||
return fromBytes(path.toString(), b, 0);
|
||||
return fromBytes(path.toString(), b);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -206,8 +205,7 @@ public void save(FileSystem fs, Path path, T instance,
|
||||
* @throws IOException on any failure
|
||||
*/
|
||||
private void writeJsonAsBytes(T instance,
|
||||
DataOutputStream dataOutputStream) throws
|
||||
IOException {
|
||||
DataOutputStream dataOutputStream) throws IOException {
|
||||
try {
|
||||
byte[] b = toBytes(instance);
|
||||
dataOutputStream.write(b);
|
||||
@ -227,37 +225,51 @@ public byte[] toBytes(T instance) throws IOException {
|
||||
return json.getBytes(UTF_8);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert JSON To bytes, inserting the header
|
||||
* @param instance instance to convert
|
||||
* @return a byte array
|
||||
* @throws IOException
|
||||
*/
|
||||
public byte[] toByteswithHeader(T instance) throws IOException {
|
||||
byte[] body = toBytes(instance);
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.allocate(body.length + header.length);
|
||||
buffer.put(header);
|
||||
buffer.put(body);
|
||||
return buffer.array();
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize from a byte array
|
||||
* @param path path the data came from
|
||||
* @param bytes byte array
|
||||
* @return offset in the array to read from
|
||||
* @throws IOException all problems
|
||||
* @throws EOFException not enough data
|
||||
* @throws InvalidRecordException if the parsing failed -the record is invalid
|
||||
*/
|
||||
public T fromBytes(String path, byte[] bytes, int offset) throws IOException,
|
||||
public T fromBytes(String path, byte[] bytes) throws IOException,
|
||||
InvalidRecordException {
|
||||
int data = bytes.length - offset;
|
||||
if (data <= 0) {
|
||||
throw new EOFException("No data at " + path);
|
||||
return fromBytes(path, bytes, "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize from a byte array, optionally checking for a marker string.
|
||||
* <p>
|
||||
* If the marker parameter is supplied (and not empty), then its presence
|
||||
* will be verified before the JSON parsing takes place; it is a fast-fail
|
||||
* check. If not found, an {@link InvalidRecordException} exception will be
|
||||
* raised
|
||||
* @param path path the data came from
|
||||
* @param bytes byte array
|
||||
* @param marker an optional string which, if set, MUST be present in the
|
||||
* UTF-8 parsed payload.
|
||||
* @return The parsed record
|
||||
* @throws IOException all problems
|
||||
* @throws EOFException not enough data
|
||||
* @throws InvalidRecordException if the JSON parsing failed.
|
||||
* @throws NoRecordException if the data is not considered a record: either
|
||||
* it is too short or it did not contain the marker string.
|
||||
*/
|
||||
public T fromBytes(String path, byte[] bytes, String marker)
|
||||
throws IOException, NoRecordException, InvalidRecordException {
|
||||
int len = bytes.length;
|
||||
if (len == 0 ) {
|
||||
throw new NoRecordException(path, E_NO_DATA);
|
||||
}
|
||||
if (StringUtils.isNotEmpty(marker) && len < marker.length()) {
|
||||
throw new NoRecordException(path, E_DATA_TOO_SHORT);
|
||||
}
|
||||
String json = new String(bytes, 0, len, UTF_8);
|
||||
if (StringUtils.isNotEmpty(marker)
|
||||
&& !json.contains(marker)) {
|
||||
throw new NoRecordException(path, E_MISSING_MARKER_STRING + marker);
|
||||
}
|
||||
String json = new String(bytes, offset, data, UTF_8);
|
||||
try {
|
||||
return fromJson(json);
|
||||
} catch (JsonProcessingException e) {
|
||||
@ -266,52 +278,7 @@ public T fromBytes(String path, byte[] bytes, int offset) throws IOException,
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a byte array to a type, checking the header first
|
||||
* @param path source of data
|
||||
* @param buffer buffer
|
||||
* @return the parsed structure
|
||||
* Null if the record was too short or the header did not match
|
||||
* @throws IOException on a failure
|
||||
* @throws NoRecordException if header checks implied there was no record
|
||||
* @throws InvalidRecordException if record parsing failed
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public T fromBytesWithHeader(String path, byte[] buffer) throws IOException {
|
||||
int hlen = header.length;
|
||||
int blen = buffer.length;
|
||||
if (hlen > 0) {
|
||||
if (blen < hlen) {
|
||||
throw new NoRecordException(path, E_NO_SERVICE_RECORD);
|
||||
}
|
||||
byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
|
||||
if (!Arrays.equals(header, magic)) {
|
||||
LOG.debug("start of entry does not match service record header at {}",
|
||||
path);
|
||||
throw new NoRecordException(path, E_NO_SERVICE_RECORD);
|
||||
}
|
||||
}
|
||||
return fromBytes(path, buffer, hlen);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a buffer has a header which matches this record type
|
||||
* @param buffer buffer
|
||||
* @return true if there is a match
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean headerMatches(byte[] buffer) throws IOException {
|
||||
int hlen = header.length;
|
||||
int blen = buffer.length;
|
||||
boolean matches = false;
|
||||
if (blen > hlen) {
|
||||
byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
|
||||
matches = Arrays.equals(header, magic);
|
||||
}
|
||||
return matches;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an object to a JSON string
|
||||
* Convert an instance to a JSON string
|
||||
* @param instance instance to convert
|
||||
* @return a JSON string description
|
||||
* @throws JsonParseException parse problems
|
||||
@ -324,4 +291,19 @@ public synchronized String toJson(T instance) throws IOException,
|
||||
return mapper.writeValueAsString(instance);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an instance to a string form for output. This is a robust
|
||||
* operation which will convert any JSON-generating exceptions into
|
||||
* error text.
|
||||
* @param instance non-null instance
|
||||
* @return a JSON string
|
||||
*/
|
||||
public String toString(T instance) {
|
||||
Preconditions.checkArgument(instance != null, "Null instance argument");
|
||||
try {
|
||||
return toJson(instance);
|
||||
} catch (IOException e) {
|
||||
return "Failed to convert to a string: " + e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,17 +22,19 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
|
||||
import org.apache.hadoop.registry.client.types.AddressTypes;
|
||||
import static org.apache.hadoop.registry.client.types.AddressTypes.*;
|
||||
import org.apache.hadoop.registry.client.types.Endpoint;
|
||||
import org.apache.hadoop.registry.client.types.ProtocolTypes;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Static methods to work with registry types —primarily endpoints and the
|
||||
@ -94,79 +96,66 @@ public static Endpoint inetAddrEndpoint(String api,
|
||||
Preconditions.checkArgument(protocolType != null, "null protocolType");
|
||||
Preconditions.checkArgument(hostname != null, "null hostname");
|
||||
return new Endpoint(api,
|
||||
AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
|
||||
ADDRESS_HOSTNAME_AND_PORT,
|
||||
protocolType,
|
||||
tuplelist(hostname, Integer.toString(port)));
|
||||
hostnamePortPair(hostname, port));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an IPC endpoint
|
||||
* @param api API
|
||||
* @param protobuf flag to indicate whether or not the IPC uses protocol
|
||||
* buffers
|
||||
* @param address the address as a tuple of (hostname, port)
|
||||
* @return the new endpoint
|
||||
*/
|
||||
public static Endpoint ipcEndpoint(String api,
|
||||
boolean protobuf, List<String> address) {
|
||||
ArrayList<List<String>> addressList = new ArrayList<List<String>>();
|
||||
if (address != null) {
|
||||
addressList.add(address);
|
||||
}
|
||||
public static Endpoint ipcEndpoint(String api, InetSocketAddress address) {
|
||||
return new Endpoint(api,
|
||||
AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
|
||||
protobuf ? ProtocolTypes.PROTOCOL_HADOOP_IPC_PROTOBUF
|
||||
: ProtocolTypes.PROTOCOL_HADOOP_IPC,
|
||||
addressList);
|
||||
ADDRESS_HOSTNAME_AND_PORT,
|
||||
ProtocolTypes.PROTOCOL_HADOOP_IPC,
|
||||
address== null ? null: hostnamePortPair(address));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a single-element list of tuples from the input.
|
||||
* that is, an input ("a","b","c") is converted into a list
|
||||
* in the form [["a","b","c"]]
|
||||
* @param t1 tuple elements
|
||||
* @return a list containing a single tuple
|
||||
* Create a single entry map
|
||||
* @param key map entry key
|
||||
* @param val map entry value
|
||||
* @return a 1 entry map.
|
||||
*/
|
||||
public static List<List<String>> tuplelist(String... t1) {
|
||||
List<List<String>> outer = new ArrayList<List<String>>();
|
||||
outer.add(tuple(t1));
|
||||
return outer;
|
||||
public static Map<String, String> map(String key, String val) {
|
||||
Map<String, String> map = new HashMap<String, String>(1);
|
||||
map.put(key, val);
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a tuples from the input.
|
||||
* that is, an input ("a","b","c") is converted into a list
|
||||
* in the form ["a","b","c"]
|
||||
* @param t1 tuple elements
|
||||
* @return a single tuple as a list
|
||||
* Create a URI
|
||||
* @param uri value
|
||||
* @return a 1 entry map.
|
||||
*/
|
||||
public static List<String> tuple(String... t1) {
|
||||
return Arrays.asList(t1);
|
||||
public static Map<String, String> uri(String uri) {
|
||||
return map(ADDRESS_URI, uri);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a tuples from the input, converting all to Strings in the process
|
||||
* that is, an input ("a", 7, true) is converted into a list
|
||||
* in the form ["a","7,"true"]
|
||||
* @param t1 tuple elements
|
||||
* @return a single tuple as a list
|
||||
* Create a (hostname, port) address pair
|
||||
* @param hostname hostname
|
||||
* @param port port
|
||||
* @return a 1 entry map.
|
||||
*/
|
||||
public static List<String> tuple(Object... t1) {
|
||||
List<String> l = new ArrayList<String>(t1.length);
|
||||
for (Object t : t1) {
|
||||
l.add(t.toString());
|
||||
}
|
||||
return l;
|
||||
public static Map<String, String> hostnamePortPair(String hostname, int port) {
|
||||
Map<String, String> map =
|
||||
map(ADDRESS_HOSTNAME_FIELD, hostname);
|
||||
map.put(ADDRESS_PORT_FIELD, Integer.toString(port));
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a socket address pair into a string tuple, (host, port).
|
||||
* TODO JDK7: move to InetAddress.getHostString() to avoid DNS lookups.
|
||||
* @param address an address
|
||||
* @return an element for the address list
|
||||
* Create a (hostname, port) address pair
|
||||
* @param address socket address whose hostname and port are used for the
|
||||
* generated address.
|
||||
* @return a 1 entry map.
|
||||
*/
|
||||
public static List<String> marshall(InetSocketAddress address) {
|
||||
return tuple(address.getHostName(), address.getPort());
|
||||
public static Map<String, String> hostnamePortPair(InetSocketAddress address) {
|
||||
return hostnamePortPair(address.getHostName(), address.getPort());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -199,24 +188,36 @@ public static List<String> retrieveAddressesUriType(Endpoint epr)
|
||||
if (epr == null) {
|
||||
return null;
|
||||
}
|
||||
requireAddressType(AddressTypes.ADDRESS_URI, epr);
|
||||
List<List<String>> addresses = epr.addresses;
|
||||
requireAddressType(ADDRESS_URI, epr);
|
||||
List<Map<String, String>> addresses = epr.addresses;
|
||||
if (addresses.size() < 1) {
|
||||
throw new InvalidRecordException(epr.toString(),
|
||||
"No addresses in endpoint");
|
||||
}
|
||||
List<String> results = new ArrayList<String>(addresses.size());
|
||||
for (List<String> address : addresses) {
|
||||
if (address.size() != 1) {
|
||||
throw new InvalidRecordException(epr.toString(),
|
||||
"Address payload invalid: wrong element count: " +
|
||||
address.size());
|
||||
}
|
||||
results.add(address.get(0));
|
||||
for (Map<String, String> address : addresses) {
|
||||
results.add(getAddressField(address, ADDRESS_URI));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a specific field from an address -raising an exception if
|
||||
* the field is not present
|
||||
* @param address address to query
|
||||
* @param field field to resolve
|
||||
* @return the resolved value. Guaranteed to be non-null.
|
||||
* @throws InvalidRecordException if the field did not resolve
|
||||
*/
|
||||
public static String getAddressField(Map<String, String> address,
|
||||
String field) throws InvalidRecordException {
|
||||
String val = address.get(field);
|
||||
if (val == null) {
|
||||
throw new InvalidRecordException("", "Missing address field: " + field);
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the address URLs. Guranteed to return at least one address.
|
||||
* @param epr endpoint
|
||||
@ -237,4 +238,53 @@ public static List<URL> retrieveAddressURLs(Endpoint epr)
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the record by checking for null fields and other invalid
|
||||
* conditions
|
||||
* @param path path for exceptions
|
||||
* @param record record to validate. May be null
|
||||
* @throws InvalidRecordException on invalid entries
|
||||
*/
|
||||
public static void validateServiceRecord(String path, ServiceRecord record)
|
||||
throws InvalidRecordException {
|
||||
if (record == null) {
|
||||
throw new InvalidRecordException(path, "Null record");
|
||||
}
|
||||
if (!ServiceRecord.RECORD_TYPE.equals(record.type)) {
|
||||
throw new InvalidRecordException(path,
|
||||
"invalid record type field: \"" + record.type + "\"");
|
||||
}
|
||||
|
||||
if (record.external != null) {
|
||||
for (Endpoint endpoint : record.external) {
|
||||
validateEndpoint(path, endpoint);
|
||||
}
|
||||
}
|
||||
if (record.internal != null) {
|
||||
for (Endpoint endpoint : record.internal) {
|
||||
validateEndpoint(path, endpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the endpoint by checking for null fields and other invalid
|
||||
* conditions
|
||||
* @param path path for exceptions
|
||||
* @param endpoint endpoint to validate. May be null
|
||||
* @throws InvalidRecordException on invalid entries
|
||||
*/
|
||||
public static void validateEndpoint(String path, Endpoint endpoint)
|
||||
throws InvalidRecordException {
|
||||
if (endpoint == null) {
|
||||
throw new InvalidRecordException(path, "Null endpoint");
|
||||
}
|
||||
try {
|
||||
endpoint.validate();
|
||||
} catch (RuntimeException e) {
|
||||
throw new InvalidRecordException(path, e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -33,7 +33,6 @@
|
||||
import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
|
||||
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -314,7 +313,7 @@ public static Map<String, ServiceRecord> extractServiceRecords(
|
||||
Collection<RegistryPathStatus> stats) throws IOException {
|
||||
Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size());
|
||||
for (RegistryPathStatus stat : stats) {
|
||||
if (stat.size > ServiceRecordHeader.getLength()) {
|
||||
if (stat.size > ServiceRecord.RECORD_TYPE.length()) {
|
||||
// maybe has data
|
||||
String path = join(parentpath, stat.path);
|
||||
try {
|
||||
@ -344,7 +343,6 @@ public static Map<String, ServiceRecord> extractServiceRecords(
|
||||
* <p>
|
||||
* @param operations operation support for fetches
|
||||
* @param parentpath path of the parent of all the entries
|
||||
* @param stats a map of name:value mappings.
|
||||
* @return a possibly empty map of fullpath:record.
|
||||
* @throws IOException for any IO Operation that wasn't ignored.
|
||||
*/
|
||||
@ -362,7 +360,6 @@ public static Map<String, ServiceRecord> extractServiceRecords(
|
||||
* <p>
|
||||
* @param operations operation support for fetches
|
||||
* @param parentpath path of the parent of all the entries
|
||||
* @param stats a map of name:value mappings.
|
||||
* @return a possibly empty map of fullpath:record.
|
||||
* @throws IOException for any IO Operation that wasn't ignored.
|
||||
*/
|
||||
@ -382,7 +379,7 @@ public static Map<String, ServiceRecord> extractServiceRecords(
|
||||
*/
|
||||
public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> {
|
||||
public ServiceRecordMarshal() {
|
||||
super(ServiceRecord.class, ServiceRecordHeader.getData());
|
||||
super(ServiceRecord.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,17 +21,11 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
|
||||
|
||||
/**
|
||||
* Raised if there is no {@link ServiceRecord} resolved at the end
|
||||
* of the specified path, for reasons such as:
|
||||
* <ul>
|
||||
* <li>There wasn't enough data to contain a Service Record.</li>
|
||||
* <li>The start of the data did not match the {@link ServiceRecordHeader}
|
||||
* header.</li>
|
||||
* </ul>
|
||||
*
|
||||
* of the specified path.
|
||||
* <p>
|
||||
* There may be valid data of some form at the end of the path, but it does
|
||||
* not appear to be a Service Record.
|
||||
*/
|
||||
|
@ -24,9 +24,11 @@
|
||||
import org.apache.hadoop.registry.client.api.BindFlags;
|
||||
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
||||
|
||||
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
|
||||
import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
|
||||
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
|
||||
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
@ -103,10 +105,12 @@ public void bind(String path,
|
||||
int flags) throws IOException {
|
||||
Preconditions.checkArgument(record != null, "null record");
|
||||
validatePath(path);
|
||||
// validate the record before putting it
|
||||
RegistryTypeUtils.validateServiceRecord(path, record);
|
||||
LOG.info("Bound at {} : {}", path, record);
|
||||
|
||||
CreateMode mode = CreateMode.PERSISTENT;
|
||||
byte[] bytes = serviceRecordMarshal.toByteswithHeader(record);
|
||||
byte[] bytes = serviceRecordMarshal.toBytes(record);
|
||||
zkSet(path, mode, bytes, getClientAcls(),
|
||||
((flags & BindFlags.OVERWRITE) != 0));
|
||||
}
|
||||
@ -114,7 +118,11 @@ public void bind(String path,
|
||||
@Override
|
||||
public ServiceRecord resolve(String path) throws IOException {
|
||||
byte[] bytes = zkRead(path);
|
||||
return serviceRecordMarshal.fromBytesWithHeader(path, bytes);
|
||||
|
||||
ServiceRecord record = serviceRecordMarshal.fromBytes(path,
|
||||
bytes, ServiceRecord.RECORD_TYPE);
|
||||
RegistryTypeUtils.validateServiceRecord(path, record);
|
||||
return record;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -38,6 +38,8 @@ public interface AddressTypes {
|
||||
* </pre>
|
||||
*/
|
||||
public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port";
|
||||
public static final String ADDRESS_HOSTNAME_FIELD = "host";
|
||||
public static final String ADDRESS_PORT_FIELD = "port";
|
||||
|
||||
|
||||
/**
|
||||
|
@ -21,14 +21,16 @@
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.registry.client.binding.JsonSerDeser;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
|
||||
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||
import org.codehaus.jackson.map.annotate.JsonSerialize;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Description of a single service/component endpoint.
|
||||
@ -67,7 +69,7 @@ public final class Endpoint implements Cloneable {
|
||||
/**
|
||||
* a list of address tuples —tuples whose format depends on the address type
|
||||
*/
|
||||
public List<List<String>> addresses;
|
||||
public List<Map<String, String>> addresses;
|
||||
|
||||
/**
|
||||
* Create an empty instance.
|
||||
@ -84,10 +86,11 @@ public Endpoint(Endpoint that) {
|
||||
this.api = that.api;
|
||||
this.addressType = that.addressType;
|
||||
this.protocolType = that.protocolType;
|
||||
this.addresses = new ArrayList<List<String>>(that.addresses.size());
|
||||
for (List<String> address : addresses) {
|
||||
List<String> addr2 = new ArrayList<String>(address.size());
|
||||
Collections.copy(address, addr2);
|
||||
this.addresses = newAddresses(that.addresses.size());
|
||||
for (Map<String, String> address : that.addresses) {
|
||||
Map<String, String> addr2 = new HashMap<String, String>(address.size());
|
||||
addr2.putAll(address);
|
||||
addresses.add(addr2);
|
||||
}
|
||||
}
|
||||
|
||||
@ -101,16 +104,82 @@ public Endpoint(Endpoint that) {
|
||||
public Endpoint(String api,
|
||||
String addressType,
|
||||
String protocolType,
|
||||
List<List<String>> addrs) {
|
||||
List<Map<String, String>> addrs) {
|
||||
this.api = api;
|
||||
this.addressType = addressType;
|
||||
this.protocolType = protocolType;
|
||||
this.addresses = new ArrayList<List<String>>();
|
||||
this.addresses = newAddresses(0);
|
||||
if (addrs != null) {
|
||||
addresses.addAll(addrs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an endpoint with an empty address list
|
||||
* @param api API name
|
||||
* @param addressType address type
|
||||
* @param protocolType protocol type
|
||||
*/
|
||||
public Endpoint(String api,
|
||||
String addressType,
|
||||
String protocolType) {
|
||||
this.api = api;
|
||||
this.addressType = addressType;
|
||||
this.protocolType = protocolType;
|
||||
this.addresses = newAddresses(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an endpoint with a single address entry.
|
||||
* <p>
|
||||
* This constructor is superfluous given the varags constructor is equivalent
|
||||
* for a single element argument. However, type-erasure in java generics
|
||||
* causes javac to warn about unchecked generic array creation. This
|
||||
* constructor, which represents the common "one address" case, does
|
||||
* not generate compile-time warnings.
|
||||
* @param api API name
|
||||
* @param addressType address type
|
||||
* @param protocolType protocol type
|
||||
* @param addr address. May be null —in which case it is not added
|
||||
*/
|
||||
public Endpoint(String api,
|
||||
String addressType,
|
||||
String protocolType,
|
||||
Map<String, String> addr) {
|
||||
this(api, addressType, protocolType);
|
||||
if (addr != null) {
|
||||
addresses.add(addr);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an endpoint with a list of addresses
|
||||
* @param api API name
|
||||
* @param addressType address type
|
||||
* @param protocolType protocol type
|
||||
* @param addrs addresses. Null elements will be skipped
|
||||
*/
|
||||
public Endpoint(String api,
|
||||
String addressType,
|
||||
String protocolType,
|
||||
Map<String, String>...addrs) {
|
||||
this(api, addressType, protocolType);
|
||||
for (Map<String, String> addr : addrs) {
|
||||
if (addr!=null) {
|
||||
addresses.add(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new address structure of the requested size
|
||||
* @param size size to create
|
||||
* @return the new list
|
||||
*/
|
||||
private List<Map<String, String>> newAddresses(int size) {
|
||||
return new ArrayList<Map<String, String>>(size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an endpoint from a list of URIs; each URI
|
||||
* is ASCII-encoded and added to the list of addresses.
|
||||
@ -125,40 +194,16 @@ public Endpoint(String api,
|
||||
this.addressType = AddressTypes.ADDRESS_URI;
|
||||
|
||||
this.protocolType = protocolType;
|
||||
List<List<String>> addrs = new ArrayList<List<String>>(uris.length);
|
||||
List<Map<String, String>> addrs = newAddresses(uris.length);
|
||||
for (URI uri : uris) {
|
||||
addrs.add(RegistryTypeUtils.tuple(uri.toString()));
|
||||
addrs.add(RegistryTypeUtils.uri(uri.toString()));
|
||||
}
|
||||
this.addresses = addrs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder("Endpoint{");
|
||||
sb.append("api='").append(api).append('\'');
|
||||
sb.append(", addressType='").append(addressType).append('\'');
|
||||
sb.append(", protocolType='").append(protocolType).append('\'');
|
||||
|
||||
sb.append(", addresses=");
|
||||
if (addresses != null) {
|
||||
sb.append("[ ");
|
||||
for (List<String> address : addresses) {
|
||||
sb.append("[ ");
|
||||
if (address == null) {
|
||||
sb.append("NULL entry in address list");
|
||||
} else {
|
||||
for (String elt : address) {
|
||||
sb.append('"').append(elt).append("\" ");
|
||||
}
|
||||
}
|
||||
sb.append("] ");
|
||||
};
|
||||
sb.append("] ");
|
||||
} else {
|
||||
sb.append("(null) ");
|
||||
}
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
return marshalToString.toString(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -173,7 +218,7 @@ public void validate() {
|
||||
Preconditions.checkNotNull(addressType, "null addressType field");
|
||||
Preconditions.checkNotNull(protocolType, "null protocolType field");
|
||||
Preconditions.checkNotNull(addresses, "null addresses field");
|
||||
for (List<String> address : addresses) {
|
||||
for (Map<String, String> address : addresses) {
|
||||
Preconditions.checkNotNull(address, "null element in address");
|
||||
}
|
||||
}
|
||||
@ -184,7 +229,19 @@ public void validate() {
|
||||
* @throws CloneNotSupportedException
|
||||
*/
|
||||
@Override
|
||||
protected Object clone() throws CloneNotSupportedException {
|
||||
public Object clone() throws CloneNotSupportedException {
|
||||
return super.clone();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Static instance of service record marshalling
|
||||
*/
|
||||
private static class Marshal extends JsonSerDeser<Endpoint> {
|
||||
private Marshal() {
|
||||
super(Endpoint.class);
|
||||
}
|
||||
}
|
||||
|
||||
private static final Marshal marshalToString = new Marshal();
|
||||
}
|
||||
|
@ -34,15 +34,10 @@ public interface ProtocolTypes {
|
||||
String PROTOCOL_FILESYSTEM = "hadoop/filesystem";
|
||||
|
||||
/**
|
||||
* Classic Hadoop IPC : {@value}.
|
||||
* Hadoop IPC, "classic" or protobuf : {@value}.
|
||||
*/
|
||||
String PROTOCOL_HADOOP_IPC = "hadoop/IPC";
|
||||
|
||||
/**
|
||||
* Hadoop protocol buffers IPC: {@value}.
|
||||
*/
|
||||
String PROTOCOL_HADOOP_IPC_PROTOBUF = "hadoop/protobuf";
|
||||
|
||||
/**
|
||||
* Corba IIOP: {@value}.
|
||||
*/
|
||||
|
@ -21,6 +21,7 @@
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
|
||||
import org.codehaus.jackson.annotate.JsonAnyGetter;
|
||||
import org.codehaus.jackson.annotate.JsonAnySetter;
|
||||
import org.codehaus.jackson.map.annotate.JsonSerialize;
|
||||
@ -40,6 +41,17 @@
|
||||
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
|
||||
public class ServiceRecord implements Cloneable {
|
||||
|
||||
/**
|
||||
* A type string which MUST be in the serialized json. This permits
|
||||
* fast discarding of invalid entries
|
||||
*/
|
||||
public static final String RECORD_TYPE = "JSONServiceRecord";
|
||||
|
||||
/**
|
||||
* The type field. This must be the string {@link #RECORD_TYPE}
|
||||
*/
|
||||
public String type = RECORD_TYPE;
|
||||
|
||||
/**
|
||||
* Description string
|
||||
*/
|
||||
@ -233,17 +245,5 @@ protected Object clone() throws CloneNotSupportedException {
|
||||
return super.clone();
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the record by checking for null fields and other invalid
|
||||
* conditions
|
||||
* @throws NullPointerException if a field is null when it
|
||||
* MUST be set.
|
||||
* @throws RuntimeException on invalid entries
|
||||
*/
|
||||
public void validate() {
|
||||
for (Endpoint endpoint : external) {
|
||||
Preconditions.checkNotNull("null endpoint", endpoint);
|
||||
endpoint.validate();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,59 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.registry.client.types;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Service record header; access to the byte array kept private
|
||||
* to avoid findbugs warnings of mutability
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class ServiceRecordHeader {
|
||||
/**
|
||||
* Header of a service record: "jsonservicerec"
|
||||
* By making this over 12 bytes long, we can auto-determine which entries
|
||||
* in a listing are too short to contain a record without getting their data
|
||||
*/
|
||||
private static final byte[] RECORD_HEADER = {
|
||||
'j', 's', 'o', 'n',
|
||||
's', 'e', 'r', 'v', 'i', 'c', 'e',
|
||||
'r', 'e', 'c'
|
||||
};
|
||||
|
||||
/**
|
||||
* Get the length of the record header
|
||||
* @return the header length
|
||||
*/
|
||||
public static int getLength() {
|
||||
return RECORD_HEADER.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a clone of the record header
|
||||
* @return the new record header.
|
||||
*/
|
||||
public static byte[] getData() {
|
||||
byte[] h = new byte[RECORD_HEADER.length];
|
||||
System.arraycopy(RECORD_HEADER, 0, h, 0, RECORD_HEADER.length);
|
||||
return h;
|
||||
}
|
||||
}
|
@ -4,6 +4,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
|
||||
|
||||
|
||||
(*
|
||||
============================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -19,6 +20,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
============================================================================
|
||||
*)
|
||||
|
||||
(*
|
||||
@ -71,13 +73,22 @@ CONSTANTS
|
||||
MknodeActions \* all possible mkdir actions
|
||||
|
||||
|
||||
ASSUME PathChars \in STRING
|
||||
ASSUME Paths \in STRING
|
||||
|
||||
(* Data in records is JSON, hence a string *)
|
||||
ASSUME Data \in STRING
|
||||
|
||||
----------------------------------------------------------------------------------------
|
||||
|
||||
(* the registry*)
|
||||
VARIABLE registry
|
||||
|
||||
|
||||
(* Sequence of actions to apply to the registry *)
|
||||
VARIABLE actions
|
||||
|
||||
|
||||
----------------------------------------------------------------------------------------
|
||||
(* Tuple of all variables. *)
|
||||
|
||||
@ -92,7 +103,6 @@ vars == << registry, actions >>
|
||||
|
||||
(* Persistence policy *)
|
||||
PersistPolicySet == {
|
||||
"", \* Undefined; field not present. PERMANENT is implied.
|
||||
"permanent", \* persists until explicitly removed
|
||||
"application", \* persists until the application finishes
|
||||
"application-attempt", \* persists until the application attempt finishes
|
||||
@ -104,7 +114,6 @@ TypeInvariant ==
|
||||
/\ \A p \in PersistPolicies: p \in PersistPolicySet
|
||||
|
||||
|
||||
|
||||
----------------------------------------------------------------------------------------
|
||||
|
||||
|
||||
@ -129,6 +138,14 @@ RegistryEntry == [
|
||||
]
|
||||
|
||||
|
||||
(* Define the set of all string to string mappings *)
|
||||
|
||||
StringMap == [
|
||||
STRING |-> STRING
|
||||
]
|
||||
|
||||
|
||||
|
||||
(*
|
||||
An endpoint in a service record
|
||||
*)
|
||||
@ -140,21 +157,14 @@ Endpoint == [
|
||||
addresses: Addresses
|
||||
]
|
||||
|
||||
(* Attributes are the set of all string to string mappings *)
|
||||
|
||||
Attributes == [
|
||||
STRING |-> STRING
|
||||
]
|
||||
|
||||
(*
|
||||
A service record
|
||||
*)
|
||||
ServiceRecord == [
|
||||
\* ID -used when applying the persistence policy
|
||||
yarn_id: STRING,
|
||||
|
||||
\* the persistence policy
|
||||
yarn_persistence: PersistPolicySet,
|
||||
\* This MUST be present: if it is not then the data is not a service record
|
||||
\* This permits shortcut scan & reject of byte arrays without parsing
|
||||
type: "JSONServiceRecord",
|
||||
|
||||
\*A description
|
||||
description: STRING,
|
||||
@ -166,9 +176,34 @@ ServiceRecord == [
|
||||
internal: Endpoints,
|
||||
|
||||
\* Attributes are a function
|
||||
attributes: Attributes
|
||||
attributes: StringMap
|
||||
]
|
||||
|
||||
----------------------------------------------------------------------------------------
|
||||
|
||||
(*
|
||||
There is an operation serialize whose internals are not defined,
|
||||
Which converts the service records to JSON
|
||||
*)
|
||||
|
||||
CONSTANT serialize(_)
|
||||
|
||||
(* A function which returns true iff the byte stream is considered a valid service record. *)
|
||||
CONSTANT containsServiceRecord(_)
|
||||
|
||||
(* A function to deserialize a string to JSON *)
|
||||
CONSTANT deserialize(_)
|
||||
|
||||
ASSUME \A json \in STRING: containsServiceRecord(json) \in BOOLEAN
|
||||
|
||||
(* Records can be serialized *)
|
||||
ASSUME \A r \in ServiceRecord : serialize(r) \in STRING /\ containsServiceRecord(serialize(r))
|
||||
|
||||
(* All strings for which containsServiceRecord() holds can be deserialized *)
|
||||
ASSUME \A json \in STRING: containsServiceRecord(json) => deserialize(json) \in ServiceRecord
|
||||
|
||||
|
||||
|
||||
|
||||
----------------------------------------------------------------------------------------
|
||||
|
||||
@ -304,8 +339,8 @@ validRegistry(R) ==
|
||||
\* an entry must be the root entry or have a parent entry
|
||||
/\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path))
|
||||
|
||||
\* If the entry has data, it must be a service record
|
||||
/\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords)
|
||||
\* If the entry has data, it must contain a service record
|
||||
/\ \A e \in R: (e.data = << >> \/ containsServiceRecord(e.data))
|
||||
|
||||
|
||||
----------------------------------------------------------------------------------------
|
||||
@ -336,13 +371,13 @@ mknode() adds a new empty entry where there was none before, iff
|
||||
*)
|
||||
|
||||
mknodeSimple(R, path) ==
|
||||
LET record == [ path |-> path, data |-> <<>> ]
|
||||
LET entry == [ path |-> path, data |-> <<>> ]
|
||||
IN \/ exists(R, path)
|
||||
\/ (exists(R, parent(path)) /\ canBind(R, record) /\ (R' = R \union {record} ))
|
||||
\/ (exists(R, parent(path)) /\ canBind(R, entry) /\ (R' = R \union {entry} ))
|
||||
|
||||
|
||||
(*
|
||||
For all parents, the mknodeSimpl() criteria must apply.
|
||||
For all parents, the mknodeSimple() criteria must apply.
|
||||
This could be defined recursively, though as TLA+ does not support recursion,
|
||||
an alternative is required
|
||||
|
||||
@ -350,7 +385,8 @@ an alternative is required
|
||||
Because this specification is declaring the final state of a operation, not
|
||||
the implemental, all that is needed is to describe those parents.
|
||||
|
||||
It declares that the mkdirSimple state applies to the path and all its parents in the set R'
|
||||
It declares that the mknodeSimple() state applies to the path and all
|
||||
its parents in the set R'
|
||||
|
||||
*)
|
||||
mknodeWithParents(R, path) ==
|
||||
@ -402,7 +438,7 @@ purge(R, path, id, persistence) ==
|
||||
=> recursiveDelete(R, p2.path)
|
||||
|
||||
(*
|
||||
resolveRecord() resolves the record at a path or fails.
|
||||
resolveEntry() resolves the record entry at a path or fails.
|
||||
|
||||
It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator
|
||||
is guaranteed to return the single entry of that set, iff the choice predicate holds.
|
||||
@ -411,19 +447,28 @@ Using a predicate of TRUE, it always succeeds, so this function selects
|
||||
the sole entry of the resolve operation.
|
||||
*)
|
||||
|
||||
resolveRecord(R, path) ==
|
||||
resolveEntry(R, path) ==
|
||||
LET l == resolve(R, path) IN
|
||||
/\ Cardinality(l) = 1
|
||||
/\ CHOOSE e \in l : TRUE
|
||||
|
||||
(*
|
||||
Resolve a record by resolving the entry and deserializing the result
|
||||
*)
|
||||
resolveRecord(R, path) ==
|
||||
deserialize(resolveEntry(R, path))
|
||||
|
||||
|
||||
(*
|
||||
The specific action of putting an entry into a record includes validating the record
|
||||
*)
|
||||
|
||||
validRecordToBind(path, record) ==
|
||||
\* The root entry must have permanent persistence
|
||||
isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent"
|
||||
\/ record.attributes["yarn:persistence"] = "")
|
||||
isRootPath(path) => (
|
||||
record.attributes["yarn:persistence"] = "permanent"
|
||||
\/ record.attributes["yarn:persistence"]
|
||||
\/ record.attributes["yarn:persistence"] = {})
|
||||
|
||||
|
||||
(*
|
||||
@ -432,13 +477,12 @@ marshalled as the data in the entry
|
||||
*)
|
||||
bindRecord(R, path, record) ==
|
||||
/\ validRecordToBind(path, record)
|
||||
/\ bind(R, [path |-> path, data |-> record])
|
||||
/\ bind(R, [path |-> path, data |-> serialize(record)])
|
||||
|
||||
|
||||
----------------------------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
(*
|
||||
The action queue can only contain one of the sets of action types, and
|
||||
by giving each a unique name, those sets are guaranteed to be disjoint
|
||||
|
@ -20,7 +20,6 @@
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.registry.client.api.RegistryConstants;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
|
||||
@ -46,11 +45,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint;
|
||||
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint;
|
||||
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint;
|
||||
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple;
|
||||
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint;
|
||||
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.*;
|
||||
|
||||
/**
|
||||
* This is a set of static methods to aid testing the registry operations.
|
||||
@ -61,18 +56,18 @@ public class RegistryTestHelper extends Assert {
|
||||
public static final String SC_HADOOP = "org-apache-hadoop";
|
||||
public static final String USER = "devteam/";
|
||||
public static final String NAME = "hdfs";
|
||||
public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs";
|
||||
public static final String API_HDFS = "org_apache_hadoop_namenode_dfs";
|
||||
public static final String API_WEBHDFS = "classpath:org.apache.hadoop.namenode.webhdfs";
|
||||
public static final String API_HDFS = "classpath:org.apache.hadoop.namenode.dfs";
|
||||
public static final String USERPATH = RegistryConstants.PATH_USERS + USER;
|
||||
public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/";
|
||||
public static final String ENTRY_PATH = PARENT_PATH + NAME;
|
||||
public static final String NNIPC = "nnipc";
|
||||
public static final String IPC2 = "IPC2";
|
||||
public static final String NNIPC = "uuid:423C2B93-C927-4050-AEC6-6540E6646437";
|
||||
public static final String IPC2 = "uuid:0663501D-5AD3-4F7E-9419-52F5D6636FCF";
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RegistryTestHelper.class);
|
||||
public static final String KTUTIL = "ktutil";
|
||||
private static final RegistryUtils.ServiceRecordMarshal recordMarshal =
|
||||
new RegistryUtils.ServiceRecordMarshal();
|
||||
public static final String HTTP_API = "http://";
|
||||
|
||||
/**
|
||||
* Assert the path is valid by ZK rules
|
||||
@ -148,9 +143,9 @@ public static void validateEntry(ServiceRecord record) {
|
||||
assertEquals(API_WEBHDFS, webhdfs.api);
|
||||
assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType);
|
||||
assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType);
|
||||
List<List<String>> addressList = webhdfs.addresses;
|
||||
List<String> url = addressList.get(0);
|
||||
String addr = url.get(0);
|
||||
List<Map<String, String>> addressList = webhdfs.addresses;
|
||||
Map<String, String> url = addressList.get(0);
|
||||
String addr = url.get("uri");
|
||||
assertTrue(addr.contains("http"));
|
||||
assertTrue(addr.contains(":8020"));
|
||||
|
||||
@ -159,8 +154,9 @@ public static void validateEntry(ServiceRecord record) {
|
||||
nnipc.protocolType);
|
||||
|
||||
Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2);
|
||||
assertNotNull(ipc2);
|
||||
|
||||
Endpoint web = findEndpoint(record, "web", true, 1, 1);
|
||||
Endpoint web = findEndpoint(record, HTTP_API, true, 1, 1);
|
||||
assertEquals(1, web.addresses.size());
|
||||
assertEquals(1, web.addresses.get(0).size());
|
||||
}
|
||||
@ -275,14 +271,14 @@ public static ServiceRecord buildExampleServiceEntry(String persistence) throws
|
||||
public static void addSampleEndpoints(ServiceRecord entry, String hostname)
|
||||
throws URISyntaxException {
|
||||
assertNotNull(hostname);
|
||||
entry.addExternalEndpoint(webEndpoint("web",
|
||||
entry.addExternalEndpoint(webEndpoint(HTTP_API,
|
||||
new URI("http", hostname + ":80", "/")));
|
||||
entry.addExternalEndpoint(
|
||||
restEndpoint(API_WEBHDFS,
|
||||
new URI("http", hostname + ":8020", "/")));
|
||||
|
||||
Endpoint endpoint = ipcEndpoint(API_HDFS, true, null);
|
||||
endpoint.addresses.add(tuple(hostname, "8030"));
|
||||
Endpoint endpoint = ipcEndpoint(API_HDFS, null);
|
||||
endpoint.addresses.add(RegistryTypeUtils.hostnamePortPair(hostname, 8030));
|
||||
entry.addInternalEndpoint(endpoint);
|
||||
InetSocketAddress localhost = new InetSocketAddress("localhost", 8050);
|
||||
entry.addInternalEndpoint(
|
||||
@ -290,9 +286,7 @@ public static void addSampleEndpoints(ServiceRecord entry, String hostname)
|
||||
8050));
|
||||
entry.addInternalEndpoint(
|
||||
RegistryTypeUtils.ipcEndpoint(
|
||||
IPC2,
|
||||
true,
|
||||
RegistryTypeUtils.marshall(localhost)));
|
||||
IPC2, localhost));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -19,9 +19,9 @@
|
||||
package org.apache.hadoop.registry.client.binding;
|
||||
|
||||
import org.apache.hadoop.registry.RegistryTestHelper;
|
||||
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
|
||||
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
|
||||
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
@ -31,8 +31,6 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.EOFException;
|
||||
|
||||
/**
|
||||
* Test record marshalling
|
||||
*/
|
||||
@ -44,6 +42,7 @@ public class TestMarshalling extends RegistryTestHelper {
|
||||
public final Timeout testTimeout = new Timeout(10000);
|
||||
@Rule
|
||||
public TestName methodName = new TestName();
|
||||
|
||||
private static RegistryUtils.ServiceRecordMarshal marshal;
|
||||
|
||||
@BeforeClass
|
||||
@ -55,42 +54,65 @@ public static void setupClass() {
|
||||
public void testRoundTrip() throws Throwable {
|
||||
String persistence = PersistencePolicies.PERMANENT;
|
||||
ServiceRecord record = createRecord(persistence);
|
||||
record.set("customkey","customvalue");
|
||||
record.set("customkey2","customvalue2");
|
||||
record.set("customkey", "customvalue");
|
||||
record.set("customkey2", "customvalue2");
|
||||
RegistryTypeUtils.validateServiceRecord("", record);
|
||||
LOG.info(marshal.toJson(record));
|
||||
byte[] bytes = marshal.toBytes(record);
|
||||
ServiceRecord r2 = marshal.fromBytes("", bytes, 0);
|
||||
ServiceRecord r2 = marshal.fromBytes("", bytes);
|
||||
assertMatches(record, r2);
|
||||
RegistryTypeUtils.validateServiceRecord("", r2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRoundTripHeaders() throws Throwable {
|
||||
ServiceRecord record = createRecord(PersistencePolicies.CONTAINER);
|
||||
byte[] bytes = marshal.toByteswithHeader(record);
|
||||
ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
|
||||
assertMatches(record, r2);
|
||||
|
||||
@Test(expected = NoRecordException.class)
|
||||
public void testUnmarshallNoData() throws Throwable {
|
||||
marshal.fromBytes("src", new byte[]{});
|
||||
}
|
||||
|
||||
@Test(expected = NoRecordException.class)
|
||||
public void testRoundTripBadHeaders() throws Throwable {
|
||||
ServiceRecord record = createRecord(PersistencePolicies.APPLICATION);
|
||||
byte[] bytes = marshal.toByteswithHeader(record);
|
||||
bytes[1] = 0x01;
|
||||
marshal.fromBytesWithHeader("src", bytes);
|
||||
public void testUnmarshallNotEnoughData() throws Throwable {
|
||||
// this is nominally JSON -but without the service record header
|
||||
marshal.fromBytes("src", new byte[]{'{','}'}, ServiceRecord.RECORD_TYPE);
|
||||
}
|
||||
|
||||
@Test(expected = NoRecordException.class)
|
||||
public void testUnmarshallHeaderTooShort() throws Throwable {
|
||||
marshal.fromBytesWithHeader("src", new byte[]{'a'});
|
||||
}
|
||||
|
||||
@Test(expected = EOFException.class)
|
||||
@Test(expected = InvalidRecordException.class)
|
||||
public void testUnmarshallNoBody() throws Throwable {
|
||||
byte[] bytes = ServiceRecordHeader.getData();
|
||||
marshal.fromBytesWithHeader("src", bytes);
|
||||
byte[] bytes = "this is not valid JSON at all and should fail".getBytes();
|
||||
marshal.fromBytes("src", bytes);
|
||||
}
|
||||
|
||||
@Test(expected = InvalidRecordException.class)
|
||||
public void testUnmarshallWrongType() throws Throwable {
|
||||
byte[] bytes = "{'type':''}".getBytes();
|
||||
ServiceRecord serviceRecord = marshal.fromBytes("marshalling", bytes);
|
||||
RegistryTypeUtils.validateServiceRecord("validating", serviceRecord);
|
||||
}
|
||||
|
||||
@Test(expected = NoRecordException.class)
|
||||
public void testUnmarshallWrongLongType() throws Throwable {
|
||||
ServiceRecord record = new ServiceRecord();
|
||||
record.type = "ThisRecordHasALongButNonMatchingType";
|
||||
byte[] bytes = marshal.toBytes(record);
|
||||
ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
|
||||
bytes, ServiceRecord.RECORD_TYPE);
|
||||
}
|
||||
|
||||
@Test(expected = NoRecordException.class)
|
||||
public void testUnmarshallNoType() throws Throwable {
|
||||
ServiceRecord record = new ServiceRecord();
|
||||
record.type = "NoRecord";
|
||||
byte[] bytes = marshal.toBytes(record);
|
||||
ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
|
||||
bytes, ServiceRecord.RECORD_TYPE);
|
||||
}
|
||||
|
||||
@Test(expected = InvalidRecordException.class)
|
||||
public void testRecordValidationWrongType() throws Throwable {
|
||||
ServiceRecord record = new ServiceRecord();
|
||||
record.type = "NotAServiceRecordType";
|
||||
RegistryTypeUtils.validateServiceRecord("validating", record);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnknownFieldsRoundTrip() throws Throwable {
|
||||
@ -102,8 +124,8 @@ public void testUnknownFieldsRoundTrip() throws Throwable {
|
||||
assertEquals("2", record.get("intval"));
|
||||
assertNull(record.get("null"));
|
||||
assertEquals("defval", record.get("null", "defval"));
|
||||
byte[] bytes = marshal.toByteswithHeader(record);
|
||||
ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
|
||||
byte[] bytes = marshal.toBytes(record);
|
||||
ServiceRecord r2 = marshal.fromBytes("", bytes);
|
||||
assertEquals("value", r2.get("key"));
|
||||
assertEquals("2", r2.get("intval"));
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
import org.apache.hadoop.fs.PathNotFoundException;
|
||||
import org.apache.hadoop.registry.AbstractRegistryTest;
|
||||
import org.apache.hadoop.registry.client.api.BindFlags;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
|
||||
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
|
||||
@ -91,10 +92,8 @@ public void testLsParent() throws Throwable {
|
||||
childStats.values());
|
||||
assertEquals(1, records.size());
|
||||
ServiceRecord record = records.get(ENTRY_PATH);
|
||||
assertNotNull(record);
|
||||
record.validate();
|
||||
RegistryTypeUtils.validateServiceRecord(ENTRY_PATH, record);
|
||||
assertMatches(written, record);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -352,6 +352,10 @@ application.
|
||||
<td>Name</td>
|
||||
<td>Description</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>type: String</td>
|
||||
<td>Always: "JSONServiceRecord"</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>description: String</td>
|
||||
<td>Human-readable description.</td>
|
||||
@ -366,6 +370,8 @@ application.
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
The type field MUST be `"JSONServiceRecord"`. Mandating this string allows future record types *and* permits rapid rejection of byte arrays that lack this string before attempting JSON parsing.
|
||||
|
||||
### YARN Persistence policies
|
||||
|
||||
The YARN Resource Manager integration integrates cleanup of service records
|
||||
@ -379,7 +385,6 @@ any use of the registry without the RM's participation.
|
||||
The attributes, `yarn:id` and `yarn:persistence` specify which records
|
||||
*and any child entries* may be deleted as the associated YARN components complete.
|
||||
|
||||
|
||||
The `yarn:id` field defines the application, attempt or container ID to match;
|
||||
the `yarn:persistence` attribute defines the trigger for record cleanup, and
|
||||
implicitly the type of the contents of the `yarn:id` field.
|
||||
@ -432,31 +437,32 @@ up according the lifecycle of that application.
|
||||
<td>Description</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>addresses: List[List[String]]</td>
|
||||
<td>a list of address tuples whose format depends on the address type</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>addressType: String</td>
|
||||
<td>format of the binding</td>
|
||||
</tr>
|
||||
<td>api: URI as String</td>
|
||||
<td>API implemented at the end of the binding</td>
|
||||
<tr>
|
||||
<td>protocol: String</td>
|
||||
<td>Protocol. Examples:
|
||||
`http`, `https`, `hadoop-rpc`, `zookeeper`, `web`, `REST`, `SOAP`, ...</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>api: String</td>
|
||||
<td>API implemented at the end of the binding</td>
|
||||
<td>addressType: String</td>
|
||||
<td>format of the binding</td>
|
||||
</tr>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>addresses: List[Map[String, String]]</td>
|
||||
<td>a list of address maps</td>
|
||||
</tr>
|
||||
|
||||
</table>
|
||||
|
||||
|
||||
All string fields have a limit on size, to dissuade services from hiding
|
||||
complex JSON structures in the text description.
|
||||
|
||||
### Field: Address Type
|
||||
#### Field `addressType`: Address Type
|
||||
|
||||
The addressType field defines the string format of entries.
|
||||
The `addressType` field defines the string format of entries.
|
||||
|
||||
Having separate types is that tools (such as a web viewer) can process binding
|
||||
strings without having to recognize the protocol.
|
||||
@ -467,43 +473,58 @@ strings without having to recognize the protocol.
|
||||
<td>binding format</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`url`</td>
|
||||
<td>`[URL]`</td>
|
||||
<td>uri</td>
|
||||
<td>uri:URI of endpoint</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`hostname`</td>
|
||||
<td>`[hostname]`</td>
|
||||
<td>hostname</td>
|
||||
<td>hostname: service host</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`inetaddress`</td>
|
||||
<td>`[hostname, port]`</td>
|
||||
<td>inetaddress</td>
|
||||
<td>hostname: service host, port: service port</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`path`</td>
|
||||
<td>`[/path/to/something]`</td>
|
||||
<td>path</td>
|
||||
<td>path: generic unix filesystem path</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`zookeeper`</td>
|
||||
<td>`[quorum-entry, path]`</td>
|
||||
<td>zookeeper</td>
|
||||
<td>hostname: service host, port: service port, path: ZK path</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
|
||||
An actual zookeeper binding consists of a list of `hostname:port` bindings –the
|
||||
quorum— and the path within. In the proposed schema, every quorum entry will be
|
||||
listed as a triple of `[hostname, port, path]`. Client applications do not
|
||||
expect the path to de be different across the quorum. The first entry in the
|
||||
list of quorum hosts MUST define the path to be used by all clients. Later
|
||||
entries SHOULD list the same path, though clients MUST ignore these.
|
||||
In the zookeeper binding, every entry represents a single node in quorum,
|
||||
the `hostname` and `port` fields defining the hostname of the ZK instance
|
||||
and the port on which it is listening. The `path` field lists zookeeper path
|
||||
for applications to use. For example, for HBase this would refer to the znode
|
||||
containing information about the HBase cluster.
|
||||
|
||||
The path MUST be identical across all address elements in the `addresses` list.
|
||||
This ensures that any single address contains enough information to connect
|
||||
to the quorum and connect to the relevant znode.
|
||||
|
||||
New Address types may be defined; if not standard please prefix with the
|
||||
character sequence `"x-"`.
|
||||
|
||||
#### **Field: API**
|
||||
### Field `api`: API identifier
|
||||
|
||||
The API field MUST contain a URI that identifies the specific API of an endpoint.
|
||||
These MUST be unique to an API to avoid confusion.
|
||||
|
||||
The following strategies are suggested to provide unique URIs for an API
|
||||
|
||||
1. The SOAP/WS-* convention of using the URL to where the WSDL defining the service
|
||||
2. A URL to the svn/git hosted document defining a REST API
|
||||
3. the `classpath` schema followed by a path to a class or package in an application.
|
||||
4. The `uuid` schema with a generated UUID.
|
||||
|
||||
It is hoped that standard API URIs will be defined for common APIs. Two such non-normative APIs are used in this document
|
||||
|
||||
* `http://` : A web site for humans
|
||||
* `classpath:javax.management.jmx`: and endpoint supporting the JMX management protocol (RMI-based)
|
||||
|
||||
APIs may be unique to a service class, or may be common across by service
|
||||
classes. They MUST be given unique names. These MAY be based on service
|
||||
packages but MAY be derived from other naming schemes:
|
||||
|
||||
### Examples of Service Entries
|
||||
|
||||
@ -524,12 +545,14 @@ overall application. It exports the URL to a load balancer.
|
||||
|
||||
{
|
||||
"description" : "tomcat-based web application",
|
||||
"registrationTime" : 1408638082444,
|
||||
"external" : [ {
|
||||
"api" : "www",
|
||||
"api" : "http://internal.example.org/restapis/scheduler/20141026v1",
|
||||
"addressType" : "uri",
|
||||
"protocolType" : "REST",
|
||||
"addresses" : [ [ "http://loadbalancer/" ] [ "http://loadbalancer2/" ] ]
|
||||
"protocol" : "REST",
|
||||
"addresses" : [
|
||||
{ "uri" : "http://loadbalancer/" },
|
||||
{ "uri" : "http://loadbalancer2/" }
|
||||
]
|
||||
} ],
|
||||
"internal" : [ ]
|
||||
}
|
||||
@ -545,21 +568,23 @@ will trigger the deletion of this entry
|
||||
/users/devteam/org-apache-tomcat/test1/components/container-1408631738011-0001-01-000001
|
||||
|
||||
{
|
||||
"registrationTime" : 1408638082445,
|
||||
"yarn:id" : "container_1408631738011_0001_01_000001",
|
||||
"yarn:persistence" : "3",
|
||||
"description" : null,
|
||||
"yarn:persistence" : "container",
|
||||
"description" : "",
|
||||
"external" : [ {
|
||||
"api" : "www",
|
||||
"api" : "http://internal.example.org/restapis/scheduler/20141026v1",
|
||||
"addressType" : "uri",
|
||||
"protocolType" : "REST",
|
||||
"addresses" : [ [ "http://rack4server3:43572" ] ]
|
||||
"protocol" : "REST",
|
||||
"addresses" : [{ "uri" : "rack4server3:43572" } ]
|
||||
} ],
|
||||
"internal" : [ {
|
||||
"api" : "jmx",
|
||||
"api" : "classpath:javax.management.jmx",
|
||||
"addressType" : "host/port",
|
||||
"protocolType" : "JMX",
|
||||
"addresses" : [ [ "rack4server3", "43573" ] ]
|
||||
"protocol" : "rmi",
|
||||
"addresses" : [ {
|
||||
"host" : "rack4server3",
|
||||
"port" : "48551"
|
||||
} ]
|
||||
} ]
|
||||
}
|
||||
|
||||
@ -571,19 +596,22 @@ external endpoint, the JMX addresses as internal.
|
||||
{
|
||||
"registrationTime" : 1408638082445,
|
||||
"yarn:id" : "container_1408631738011_0001_01_000002",
|
||||
"yarn:persistence" : "3",
|
||||
"yarn:persistence" : "container",
|
||||
"description" : null,
|
||||
"external" : [ {
|
||||
"api" : "www",
|
||||
"api" : "http://internal.example.org/restapis/scheduler/20141026v1",
|
||||
"addressType" : "uri",
|
||||
"protocolType" : "REST",
|
||||
"protocol" : "REST",
|
||||
"addresses" : [ [ "http://rack1server28:35881" ] ]
|
||||
} ],
|
||||
"internal" : [ {
|
||||
"api" : "jmx",
|
||||
"api" : "classpath:javax.management.jmx",
|
||||
"addressType" : "host/port",
|
||||
"protocolType" : "JMX",
|
||||
"addresses" : [ [ "rack1server28", "35882" ] ]
|
||||
"protocol" : "rmi",
|
||||
"addresses" : [ {
|
||||
"host" : "rack1server28",
|
||||
"port" : "48551"
|
||||
} ]
|
||||
} ]
|
||||
}
|
||||
|
||||
@ -887,3 +915,106 @@ Implementations may throttle update operations.
|
||||
**Rate of Polling**
|
||||
|
||||
Clients which poll the registry may be throttled.
|
||||
|
||||
# Complete service record example
|
||||
|
||||
Below is a (non-normative) example of a service record retrieved
|
||||
from a YARN application.
|
||||
|
||||
|
||||
{
|
||||
"type" : "JSONServiceRecord",
|
||||
"description" : "Slider Application Master",
|
||||
"yarn:persistence" : "application",
|
||||
"yarn:id" : "application_1414052463672_0028",
|
||||
"external" : [ {
|
||||
"api" : "classpath:org.apache.slider.appmaster",
|
||||
"addressType" : "host/port",
|
||||
"protocol" : "hadoop/IPC",
|
||||
"addresses" : [ {
|
||||
"port" : "48551",
|
||||
"host" : "nn.example.com"
|
||||
} ]
|
||||
}, {
|
||||
"api" : "http://",
|
||||
"addressType" : "uri",
|
||||
"protocol" : "web",
|
||||
"addresses" : [ {
|
||||
"uri" : "http://nn.example.com:40743"
|
||||
} ]
|
||||
}, {
|
||||
"api" : "classpath:org.apache.slider.management",
|
||||
"addressType" : "uri",
|
||||
"protocol" : "REST",
|
||||
"addresses" : [ {
|
||||
"uri" : "http://nn.example.com:40743/ws/v1/slider/mgmt"
|
||||
} ]
|
||||
}, {
|
||||
"api" : "classpath:org.apache.slider.publisher",
|
||||
"addressType" : "uri",
|
||||
"protocol" : "REST",
|
||||
"addresses" : [ {
|
||||
"uri" : "http://nn.example.com:40743/ws/v1/slider/publisher"
|
||||
} ]
|
||||
}, {
|
||||
"api" : "classpath:org.apache.slider.registry",
|
||||
"addressType" : "uri",
|
||||
"protocol" : "REST",
|
||||
"addresses" : [ {
|
||||
"uri" : "http://nn.example.com:40743/ws/v1/slider/registry"
|
||||
} ]
|
||||
}, {
|
||||
"api" : "classpath:org.apache.slider.publisher.configurations",
|
||||
"addressType" : "uri",
|
||||
"protocol" : "REST",
|
||||
"addresses" : [ {
|
||||
"uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/slider"
|
||||
} ]
|
||||
}, {
|
||||
"api" : "classpath:org.apache.slider.publisher.exports",
|
||||
"addressType" : "uri",
|
||||
"protocol" : "REST",
|
||||
"addresses" : [ {
|
||||
"uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/exports"
|
||||
} ]
|
||||
} ],
|
||||
"internal" : [ {
|
||||
"api" : "classpath:org.apache.slider.agents.secure",
|
||||
"addressType" : "uri",
|
||||
"protocol" : "REST",
|
||||
"addresses" : [ {
|
||||
"uri" : "https://nn.example.com:52705/ws/v1/slider/agents"
|
||||
} ]
|
||||
}, {
|
||||
"api" : "classpath:org.apache.slider.agents.oneway",
|
||||
"addressType" : "uri",
|
||||
"protocol" : "REST",
|
||||
"addresses" : [ {
|
||||
"uri" : "https://nn.example.com:33425/ws/v1/slider/agents"
|
||||
} ]
|
||||
} ]
|
||||
}
|
||||
|
||||
It publishes a number of endpoints, both internal and external.
|
||||
|
||||
External:
|
||||
|
||||
1. The IPC hostname and port for client-AM communications
|
||||
1. URL to the AM's web UI
|
||||
1. A series of REST URLs under the web UI for specific application services.
|
||||
The details are irrelevant —note that they use an application-specific API
|
||||
value to ensure uniqueness.
|
||||
|
||||
Internal:
|
||||
1. Two URLS to REST APIs offered by the AM for containers deployed by
|
||||
the application itself.
|
||||
|
||||
Python agents running in the containers retrieve the internal endpoint
|
||||
URLs to communicate with their AM. The record is resolved on container startup
|
||||
and cached until communications problems occur. At that point the registry is
|
||||
queried for the current record, then an attempt is made to reconnect to the AM.
|
||||
|
||||
Here "connectivity" problems means both "low level socket/IO errors" and
|
||||
"failures in HTTPS authentication". The agents use two-way HTTPS authentication
|
||||
—if the AM fails and another application starts listening on the same ports
|
||||
it will trigger an authentication failure and hence service record reread.
|
||||
|
Loading…
Reference in New Issue
Block a user