YARN-1732. Changed types of related-entities and primary-filters in the timeline-service to be sets instead of maps. Contributed by Billie Rinaldi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1570914 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-22 20:46:54 +00:00
parent e69614d650
commit e06226126c
6 changed files with 198 additions and 108 deletions

View File

@ -218,6 +218,9 @@ Release 2.4.0 - UNRELEASED
YARN-1470. Add audience annotations to MiniYARNCluster. (Anubhav Dhoot
via kasha)
YARN-1732. Changed types of related-entities and primary-filters in the
timeline-service to be sets instead of maps. (Billie Rinaldi via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -20,9 +20,11 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@ -56,10 +58,10 @@ public class ATSEntity implements Comparable<ATSEntity> {
private String entityId;
private Long startTime;
private List<ATSEvent> events = new ArrayList<ATSEvent>();
private Map<String, List<String>> relatedEntities =
new HashMap<String, List<String>>();
private Map<String, Object> primaryFilters =
new HashMap<String, Object>();
private Map<String, Set<String>> relatedEntities =
new HashMap<String, Set<String>>();
private Map<String, Set<Object>> primaryFilters =
new HashMap<String, Set<Object>>();
private Map<String, Object> otherInfo =
new HashMap<String, Object>();
@ -173,7 +175,7 @@ public void setEvents(List<ATSEvent> events) {
* @return the related entities
*/
@XmlElement(name = "relatedentities")
public Map<String, List<String>> getRelatedEntities() {
public Map<String, Set<String>> getRelatedEntities() {
return relatedEntities;
}
@ -186,9 +188,9 @@ public Map<String, List<String>> getRelatedEntities() {
* the entity Id
*/
public void addRelatedEntity(String entityType, String entityId) {
List<String> thisRelatedEntity = relatedEntities.get(entityType);
Set<String> thisRelatedEntity = relatedEntities.get(entityType);
if (thisRelatedEntity == null) {
thisRelatedEntity = new ArrayList<String>();
thisRelatedEntity = new HashSet<String>();
relatedEntities.put(entityType, thisRelatedEntity);
}
thisRelatedEntity.add(entityId);
@ -200,10 +202,10 @@ public void addRelatedEntity(String entityType, String entityId) {
* @param relatedEntities
* a map of related entities
*/
public void addRelatedEntities(Map<String, List<String>> relatedEntities) {
for (Entry<String, List<String>> relatedEntity :
public void addRelatedEntities(Map<String, Set<String>> relatedEntities) {
for (Entry<String, Set<String>> relatedEntity :
relatedEntities.entrySet()) {
List<String> thisRelatedEntity =
Set<String> thisRelatedEntity =
this.relatedEntities.get(relatedEntity.getKey());
if (thisRelatedEntity == null) {
this.relatedEntities.put(
@ -221,7 +223,7 @@ public void addRelatedEntities(Map<String, List<String>> relatedEntities) {
* a map of related entities
*/
public void setRelatedEntities(
Map<String, List<String>> relatedEntities) {
Map<String, Set<String>> relatedEntities) {
this.relatedEntities = relatedEntities;
}
@ -231,7 +233,7 @@ public void setRelatedEntities(
* @return the primary filters
*/
@XmlElement(name = "primaryfilters")
public Map<String, Object> getPrimaryFilters() {
public Map<String, Set<Object>> getPrimaryFilters() {
return primaryFilters;
}
@ -244,7 +246,12 @@ public Map<String, Object> getPrimaryFilters() {
* the primary filter value
*/
public void addPrimaryFilter(String key, Object value) {
primaryFilters.put(key, value);
Set<Object> thisPrimaryFilter = primaryFilters.get(key);
if (thisPrimaryFilter == null) {
thisPrimaryFilter = new HashSet<Object>();
primaryFilters.put(key, thisPrimaryFilter);
}
thisPrimaryFilter.add(value);
}
/**
@ -253,8 +260,18 @@ public void addPrimaryFilter(String key, Object value) {
* @param primaryFilters
* a map of primary filters
*/
public void addPrimaryFilters(Map<String, Object> primaryFilters) {
this.primaryFilters.putAll(primaryFilters);
public void addPrimaryFilters(Map<String, Set<Object>> primaryFilters) {
for (Entry<String, Set<Object>> primaryFilter :
primaryFilters.entrySet()) {
Set<Object> thisPrimaryFilter =
this.primaryFilters.get(primaryFilter.getKey());
if (thisPrimaryFilter == null) {
this.primaryFilters.put(
primaryFilter.getKey(), primaryFilter.getValue());
} else {
thisPrimaryFilter.addAll(primaryFilter.getValue());
}
}
}
/**
@ -263,7 +280,7 @@ public void addPrimaryFilters(Map<String, Object> primaryFilters) {
* @param primaryFilters
* a map of primary filters
*/
public void setPrimaryFilters(Map<String, Object> primaryFilters) {
public void setPrimaryFilters(Map<String, Set<Object>> primaryFilters) {
this.primaryFilters = primaryFilters;
}

View File

@ -17,11 +17,6 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableUtils;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@ -31,7 +26,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableUtils;
import org.codehaus.jackson.map.ObjectMapper;
/**
* A utility class providing methods for serializing and deserializing
@ -132,9 +131,24 @@ public static void write(DataOutputStream dos, Object o)
* @throws IOException
*/
public static Object read(byte[] b) throws IOException {
if (b == null || b.length == 0)
return read(b, 0);
}
/**
* Deserializes an Object from a byte array at a specified offset, assuming
* the bytes were created with {@link #write(Object)}.
*
* @param b A byte array
* @param offset Offset into the array
* @return An Object
* @throws IOException
*/
public static Object read(byte[] b, int offset) throws IOException {
if (b == null || b.length == 0) {
return null;
ByteArrayInputStream bais = new ByteArrayInputStream(b);
}
ByteArrayInputStream bais = new ByteArrayInputStream(b, offset,
b.length - offset);
return read(new DataInputStream(bais));
}

View File

@ -259,14 +259,12 @@ private static ATSEntity getEntity(String entity, String entityType,
boolean relatedEntities = false;
if (fields.contains(Field.RELATED_ENTITIES)) {
relatedEntities = true;
atsEntity.setRelatedEntities(new HashMap<String, List<String>>());
} else {
atsEntity.setRelatedEntities(null);
}
boolean primaryFilters = false;
if (fields.contains(Field.PRIMARY_FILTERS)) {
primaryFilters = true;
atsEntity.setPrimaryFilters(new HashMap<String, Object>());
} else {
atsEntity.setPrimaryFilters(null);
}
@ -286,9 +284,8 @@ private static ATSEntity getEntity(String entity, String entityType,
break;
if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) {
if (primaryFilters) {
atsEntity.addPrimaryFilter(parseRemainingKey(key,
prefixlen + PRIMARY_FILTER_COLUMN.length),
GenericObjectMapper.read(iterator.peekNext().getValue()));
addPrimaryFilter(atsEntity, key,
prefixlen + PRIMARY_FILTER_COLUMN.length);
}
} else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
if (otherInfo) {
@ -507,9 +504,14 @@ private ATSEntities getEntityByTime(byte[] base,
if (secondaryFilters != null) {
for (NameValuePair filter : secondaryFilters) {
Object v = atsEntity.getOtherInfo().get(filter.getName());
if (v == null)
v = atsEntity.getPrimaryFilters().get(filter.getName());
if (v == null || !v.equals(filter.getValue())) {
if (v == null) {
Set<Object> vs = atsEntity.getPrimaryFilters()
.get(filter.getName());
if (vs != null && !vs.contains(filter.getValue())) {
filterPassed = false;
break;
}
} else if (!v.equals(filter.getValue())) {
filterPassed = false;
break;
}
@ -547,7 +549,7 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
return;
}
Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0);
Map<String, Object> primaryFilters = atsEntity.getPrimaryFilters();
Map<String, Set<Object>> primaryFilters = atsEntity.getPrimaryFilters();
// write event entries
if (events != null && !events.isEmpty()) {
@ -563,10 +565,10 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
}
// write related entity entries
Map<String,List<String>> relatedEntities =
Map<String, Set<String>> relatedEntities =
atsEntity.getRelatedEntities();
if (relatedEntities != null && !relatedEntities.isEmpty()) {
for (Entry<String, List<String>> relatedEntityList :
for (Entry<String, Set<String>> relatedEntityList :
relatedEntities.entrySet()) {
String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) {
@ -595,12 +597,16 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
// write primary filter entries
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Object> primaryFilter : primaryFilters.entrySet()) {
byte[] key = createPrimaryFilterKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, primaryFilter.getKey());
byte[] value = GenericObjectMapper.write(primaryFilter.getValue());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
for (Entry<String, Set<Object>> primaryFilter :
primaryFilters.entrySet()) {
for (Object primaryFilterValue : primaryFilter.getValue()) {
byte[] key = createPrimaryFilterKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime,
primaryFilter.getKey(), primaryFilterValue);
writeBatch.put(key, EMPTY_BYTES);
writePrimaryFilterEntries(writeBatch, primaryFilters, key,
EMPTY_BYTES);
}
}
}
@ -634,12 +640,14 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
* write additional entries to the db for each primary filter.
*/
private static void writePrimaryFilterEntries(WriteBatch writeBatch,
Map<String, Object> primaryFilters, byte[] key, byte[] value)
Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
throws IOException {
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Object> p : primaryFilters.entrySet()) {
writeBatch.put(addPrimaryFilterToKey(p.getKey(), p.getValue(),
key), value);
for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
for (Object pfval : pf.getValue()) {
writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval,
key), value);
}
}
}
}
@ -790,13 +798,26 @@ private static ATSEvent getEntityEvent(Set<String> eventTypes, byte[] key,
/**
* Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
* entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name.
* entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name + value.
*/
private static byte[] createPrimaryFilterKey(String entity,
String entitytype, byte[] revStartTime, String name) throws IOException {
String entitytype, byte[] revStartTime, String name, Object value)
throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
.add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name)
.getBytes();
.add(GenericObjectMapper.write(value)).getBytes();
}
/**
* Parses the primary filter from the given key at the given offset and
* adds it to the given entity.
*/
private static void addPrimaryFilter(ATSEntity atsEntity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String name = kp.getNextString();
Object value = GenericObjectMapper.read(key, kp.getOffset());
atsEntity.addPrimaryFilter(name, value);
}
/**

View File

@ -91,7 +91,7 @@ public ATSEntities getEntities(String entityType, Long limit,
continue;
}
if (primaryFilter != null &&
!matchFilter(entity.getPrimaryFilters(), primaryFilter)) {
!matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // OR logic
@ -236,7 +236,7 @@ public ATSPutErrors put(ATSEntities data) {
if (entity.getRelatedEntities() == null) {
continue;
}
for (Map.Entry<String, List<String>> partRelatedEntities : entity
for (Map.Entry<String, Set<String>> partRelatedEntities : entity
.getRelatedEntities().entrySet()) {
if (partRelatedEntities == null) {
continue;
@ -293,4 +293,14 @@ private static boolean matchFilter(Map<String, Object> tags,
return true;
}
private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
NameValuePair filter) {
Set<Object> value = tags.get(filter.getName());
if (value == null) { // doesn't have the filter
return false;
} else {
return value.contains(filter.getValue());
}
}
}

View File

@ -21,16 +21,17 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@ -44,40 +45,48 @@
public class ApplicationTimelineStoreTestUtils {
private static final Map<String, Object> EMPTY_MAP = Collections.emptyMap();
private static final Map<String, List<String>> EMPTY_REL_ENTITIES =
new HashMap<String, List<String>>();
protected static final Map<String, Object> EMPTY_MAP =
Collections.emptyMap();
protected static final Map<String, Set<Object>> EMPTY_PRIMARY_FILTERS =
Collections.emptyMap();
protected static final Map<String, Set<String>> EMPTY_REL_ENTITIES =
Collections.emptyMap();
protected ApplicationTimelineStore store;
private String entity1;
private String entityType1;
private String entity1b;
private String entity2;
private String entityType2;
private Map<String, Object> primaryFilters;
private Map<String, Object> secondaryFilters;
private Map<String, Object> allFilters;
private Map<String, Object> otherInfo;
private Map<String, List<String>> relEntityMap;
private NameValuePair userFilter;
private Collection<NameValuePair> goodTestingFilters;
private Collection<NameValuePair> badTestingFilters;
private ATSEvent ev1;
private ATSEvent ev2;
private ATSEvent ev3;
private ATSEvent ev4;
private Map<String, Object> eventInfo;
private List<ATSEvent> events1;
private List<ATSEvent> events2;
protected String entity1;
protected String entityType1;
protected String entity1b;
protected String entity2;
protected String entityType2;
protected Map<String, Set<Object>> primaryFilters;
protected Map<String, Object> secondaryFilters;
protected Map<String, Object> allFilters;
protected Map<String, Object> otherInfo;
protected Map<String, Set<String>> relEntityMap;
protected NameValuePair userFilter;
protected Collection<NameValuePair> goodTestingFilters;
protected Collection<NameValuePair> badTestingFilters;
protected ATSEvent ev1;
protected ATSEvent ev2;
protected ATSEvent ev3;
protected ATSEvent ev4;
protected Map<String, Object> eventInfo;
protected List<ATSEvent> events1;
protected List<ATSEvent> events2;
/**
* Load test data into the given store
*/
protected void loadTestData() throws IOException {
ATSEntities atsEntities = new ATSEntities();
Map<String, Object> primaryFilters = new HashMap<String, Object>();
primaryFilters.put("user", "username");
primaryFilters.put("appname", 12345l);
Map<String, Set<Object>> primaryFilters =
new HashMap<String, Set<Object>>();
Set<Object> l1 = new HashSet<Object>();
l1.add("username");
Set<Object> l2 = new HashSet<Object>();
l2.add(12345l);
primaryFilters.put("user", l1);
primaryFilters.put("appname", l2);
Map<String, Object> secondaryFilters = new HashMap<String, Object>();
secondaryFilters.put("startTime", 123456l);
secondaryFilters.put("status", "RUNNING");
@ -91,9 +100,9 @@ protected void loadTestData() throws IOException {
String entity2 = "id_2";
String entityType2 = "type_2";
Map<String, List<String>> relatedEntities =
new HashMap<String, List<String>>();
relatedEntities.put(entityType2, Collections.singletonList(entity2));
Map<String, Set<String>> relatedEntities =
new HashMap<String, Set<String>>();
relatedEntities.put(entityType2, Collections.singleton(entity2));
ATSEvent ev3 = createEvent(789l, "launch_event", null);
ATSEvent ev4 = createEvent(-123l, "init_event", null);
@ -156,15 +165,23 @@ protected void loadVerificationData() throws Exception {
badTestingFilters.add(new NameValuePair("appname", 12345l));
badTestingFilters.add(new NameValuePair("status", "FINISHED"));
primaryFilters = new HashMap<String, Object>();
primaryFilters.put("user", "username");
primaryFilters.put("appname", 12345l);
primaryFilters = new HashMap<String, Set<Object>>();
Set<Object> l1 = new HashSet<Object>();
l1.add("username");
Set<Object> l2 = new HashSet<Object>();
l2.add(12345l);
primaryFilters.put("user", l1);
primaryFilters.put("appname", l2);
secondaryFilters = new HashMap<String, Object>();
secondaryFilters.put("startTime", 123456l);
secondaryFilters.put("status", "RUNNING");
allFilters = new HashMap<String, Object>();
allFilters.putAll(secondaryFilters);
allFilters.putAll(primaryFilters);
for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
for (Object o : pf.getValue()) {
allFilters.put(pf.getKey(), o);
}
}
otherInfo = new HashMap<String, Object>();
otherInfo.put("info1", "val1");
otherInfo.put("info2", "val2");
@ -186,8 +203,8 @@ protected void loadVerificationData() throws Exception {
events1.add(ev1);
relEntityMap =
new HashMap<String, List<String>>();
List<String> ids = new ArrayList<String>();
new HashMap<String, Set<String>>();
Set<String> ids = new HashSet<String>();
ids.add(entity1);
ids.add(entity1b);
relEntityMap.put(entityType1, ids);
@ -212,8 +229,8 @@ public void testGetSingleEntity() throws IOException {
primaryFilters, otherInfo, store.getEntity(entity1b, entityType1,
EnumSet.allOf(Field.class)));
verifyEntityInfo(entity2, entityType2, events2, relEntityMap, EMPTY_MAP,
EMPTY_MAP, store.getEntity(entity2, entityType2,
verifyEntityInfo(entity2, entityType2, events2, relEntityMap,
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, store.getEntity(entity2, entityType2,
EnumSet.allOf(Field.class)));
// test getting single fields
@ -267,8 +284,8 @@ public void testGetEntities() throws IOException {
entities = store.getEntities("type_2", null, null, null, null, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entity2, entityType2, events2, relEntityMap, EMPTY_MAP,
EMPTY_MAP, entities.get(0));
verifyEntityInfo(entity2, entityType2, events2, relEntityMap,
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0));
entities = store.getEntities("type_1", 1l, null, null, null, null,
EnumSet.allOf(Field.class)).getEntities();
@ -457,9 +474,9 @@ public void testGetEvents() throws IOException {
/**
* Verify a single entity
*/
private static void verifyEntityInfo(String entity, String entityType,
List<ATSEvent> events, Map<String, List<String>> relatedEntities,
Map<String, Object> primaryFilters, Map<String, Object> otherInfo,
protected static void verifyEntityInfo(String entity, String entityType,
List<ATSEvent> events, Map<String, Set<String>> relatedEntities,
Map<String, Set<Object>> primaryFilters, Map<String, Object> otherInfo,
ATSEntity retrievedEntityInfo) {
if (entity == null) {
assertNull(retrievedEntityInfo);
@ -467,23 +484,27 @@ private static void verifyEntityInfo(String entity, String entityType,
}
assertEquals(entity, retrievedEntityInfo.getEntityId());
assertEquals(entityType, retrievedEntityInfo.getEntityType());
if (events == null)
if (events == null) {
assertNull(retrievedEntityInfo.getEvents());
else
} else {
assertEquals(events, retrievedEntityInfo.getEvents());
if (relatedEntities == null)
}
if (relatedEntities == null) {
assertNull(retrievedEntityInfo.getRelatedEntities());
else
} else {
assertEquals(relatedEntities, retrievedEntityInfo.getRelatedEntities());
if (primaryFilters == null)
}
if (primaryFilters == null) {
assertNull(retrievedEntityInfo.getPrimaryFilters());
else
} else {
assertTrue(primaryFilters.equals(
retrievedEntityInfo.getPrimaryFilters()));
if (otherInfo == null)
}
if (otherInfo == null) {
assertNull(retrievedEntityInfo.getOtherInfo());
else
} else {
assertTrue(otherInfo.equals(retrievedEntityInfo.getOtherInfo()));
}
}
/**
@ -503,21 +524,25 @@ private static void verifyEntityTimeline(
/**
* Create a test entity
*/
private static ATSEntity createEntity(String entity, String entityType,
protected static ATSEntity createEntity(String entity, String entityType,
Long startTime, List<ATSEvent> events,
Map<String, List<String>> relatedEntities,
Map<String, Object> primaryFilters, Map<String, Object> otherInfo) {
Map<String, Set<String>> relatedEntities,
Map<String, Set<Object>> primaryFilters,
Map<String, Object> otherInfo) {
ATSEntity atsEntity = new ATSEntity();
atsEntity.setEntityId(entity);
atsEntity.setEntityType(entityType);
atsEntity.setStartTime(startTime);
atsEntity.setEvents(events);
if (relatedEntities != null)
for (Entry<String, List<String>> e : relatedEntities.entrySet())
for (String v : e.getValue())
if (relatedEntities != null) {
for (Entry<String, Set<String>> e : relatedEntities.entrySet()) {
for (String v : e.getValue()) {
atsEntity.addRelatedEntity(e.getKey(), v);
else
}
}
} else {
atsEntity.setRelatedEntities(null);
}
atsEntity.setPrimaryFilters(primaryFilters);
atsEntity.setOtherInfo(otherInfo);
return atsEntity;