From 96578f0e01ba751175d4bcbad48d6f679e662382 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 5 Feb 2014 00:32:02 +0000 Subject: [PATCH] YARN-1634. Added a testable in-memory implementation of ApplicationTimelineStore. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1564583 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 5 + .../api/records/apptimeline/ATSEntity.java | 90 ++- .../apptimeline/EntityId.java | 100 ++++ .../MemoryApplicationTimelineStore.java | 288 ++++++++++ .../ApplicationTimelineStoreTestUtils.java | 532 ++++++++++++++++++ .../TestMemoryApplicationTimelineStore.java | 73 +++ 6 files changed, 1087 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityId.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8e8095dfb3..a9af3e0121 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -90,6 +90,9 @@ Release 2.4.0 - UNRELEASED implementing different storage impls for storing timeline information. (Billie Rinaldi via vinodkv) + YARN-1634. Added a testable in-memory implementation of + ApplicationTimelineStore. (Zhijie Shen via vinodkv) + IMPROVEMENTS YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via @@ -126,6 +129,8 @@ Release 2.4.0 - UNRELEASED be available across RM failover by making using of a remote configuration-provider. (Xuan Gong via vinodkv) + OPTIMIZATIONS + BUG FIXES YARN-935. Correcting pom.xml to build applicationhistoryserver module diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSEntity.java index 6b3ea1013e..709c79568e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSEntity.java @@ -50,7 +50,7 @@ @XmlAccessorType(XmlAccessType.NONE) @Public @Unstable -public class ATSEntity { +public class ATSEntity implements Comparable { private String entityType; private String entityId; @@ -310,4 +310,92 @@ public void setOtherInfo(Map otherInfo) { this.otherInfo = otherInfo; } + @Override + public int hashCode() { + // generated by eclipse + final int prime = 31; + int result = 1; + result = prime * result + ((entityId == null) ? 0 : entityId.hashCode()); + result = + prime * result + ((entityType == null) ? 0 : entityType.hashCode()); + result = prime * result + ((events == null) ? 0 : events.hashCode()); + result = prime * result + ((otherInfo == null) ? 0 : otherInfo.hashCode()); + result = + prime * result + + ((primaryFilters == null) ? 0 : primaryFilters.hashCode()); + result = + prime * result + + ((relatedEntities == null) ? 0 : relatedEntities.hashCode()); + result = prime * result + ((startTime == null) ? 0 : startTime.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + // generated by eclipse + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ATSEntity other = (ATSEntity) obj; + if (entityId == null) { + if (other.entityId != null) + return false; + } else if (!entityId.equals(other.entityId)) + return false; + if (entityType == null) { + if (other.entityType != null) + return false; + } else if (!entityType.equals(other.entityType)) + return false; + if (events == null) { + if (other.events != null) + return false; + } else if (!events.equals(other.events)) + return false; + if (otherInfo == null) { + if (other.otherInfo != null) + return false; + } else if (!otherInfo.equals(other.otherInfo)) + return false; + if (primaryFilters == null) { + if (other.primaryFilters != null) + return false; + } else if (!primaryFilters.equals(other.primaryFilters)) + return false; + if (relatedEntities == null) { + if (other.relatedEntities != null) + return false; + } else if (!relatedEntities.equals(other.relatedEntities)) + return false; + if (startTime == null) { + if (other.startTime != null) + return false; + } else if (!startTime.equals(other.startTime)) + return false; + return true; + } + + @Override + public int compareTo(ATSEntity other) { + int comparison = entityType.compareTo(other.entityType); + if (comparison == 0) { + long thisStartTime = + startTime == null ? Long.MIN_VALUE : startTime; + long otherStartTime = + other.startTime == null ? Long.MIN_VALUE : other.startTime; + if (thisStartTime > otherStartTime) { + return -1; + } else if (thisStartTime < otherStartTime) { + return 1; + } else { + return entityId.compareTo(other.entityId); + } + } else { + return comparison; + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityId.java new file mode 100644 index 0000000000..26431f8756 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityId.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * The unique identifier for an entity + */ +@Private +@Unstable +public class EntityId implements Comparable { + + private String id; + private String type; + + public EntityId(String id, String type) { + this.id = id; + this.type = type; + } + + /** + * Get the entity Id. + * @return The entity Id. + */ + public String getId() { + return id; + } + + /** + * Get the entity type. + * @return The entity type. + */ + public String getType() { + return type; + } + + @Override + public int compareTo(EntityId other) { + int c = type.compareTo(other.type); + if (c != 0) return c; + return id.compareTo(other.id); + } + + @Override + public int hashCode() { + // generated by eclipse + final int prime = 31; + int result = 1; + result = prime * result + ((id == null) ? 0 : id.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + // generated by eclipse + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + EntityId other = (EntityId) obj; + if (id == null) { + if (other.id != null) + return false; + } else if (!id.equals(other.id)) + return false; + if (type == null) { + if (other.type != null) + return false; + } else if (!type.equals(other.type)) + return false; + return true; + } + + @Override + public String toString() { + return "{ id: " + id + ", type: "+ type + " }"; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java new file mode 100644 index 0000000000..45f0a11d76 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError; + +/** + * In-memory implementation of {@link ApplicationTimelineStore}. This + * implementation is for test purpose only. If users improperly instantiate it, + * they may encounter reading and writing history data in different memory + * store. + * + */ +@Private +@Unstable +public class MemoryApplicationTimelineStore + extends AbstractService implements ApplicationTimelineStore { + + private Map entities = + new HashMap(); + + public MemoryApplicationTimelineStore() { + super(MemoryApplicationTimelineStore.class.getName()); + } + + @Override + public ATSEntities getEntities(String entityType, Long limit, + Long windowStart, Long windowEnd, NameValuePair primaryFilter, + Collection secondaryFilters, EnumSet fields) { + if (limit == null) { + limit = DEFAULT_LIMIT; + } + if (windowStart == null) { + windowStart = Long.MIN_VALUE; + } + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } + List entitiesSelected = new ArrayList(); + for (ATSEntity entity : new PriorityQueue(entities.values())) { + if (entitiesSelected.size() >= limit) { + break; + } + if (!entity.getEntityType().equals(entityType)) { + continue; + } + if (entity.getStartTime() <= windowStart) { + continue; + } + if (entity.getStartTime() > windowEnd) { + continue; + } + if (primaryFilter != null && + !matchFilter(entity.getPrimaryFilters(), primaryFilter)) { + continue; + } + if (secondaryFilters != null) { // OR logic + boolean flag = false; + for (NameValuePair secondaryFilter : secondaryFilters) { + if (secondaryFilter != null && + matchFilter(entity.getOtherInfo(), secondaryFilter)) { + flag = true; + break; + } + } + if (!flag) { + continue; + } + } + entitiesSelected.add(entity); + } + List entitiesToReturn = new ArrayList(); + for (ATSEntity entitySelected : entitiesSelected) { + entitiesToReturn.add(maskFields(entitySelected, fields)); + } + Collections.sort(entitiesToReturn); + ATSEntities entitiesWrapper = new ATSEntities(); + entitiesWrapper.setEntities(entitiesToReturn); + return entitiesWrapper; + } + + @Override + public ATSEntity getEntity(String entityId, String entityType, + EnumSet fieldsToRetrieve) { + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.allOf(Field.class); + } + ATSEntity entity = entities.get(new EntityId(entityId, entityType)); + if (entity == null) { + return null; + } else { + return maskFields(entity, fieldsToRetrieve); + } + } + + @Override + public ATSEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, + Set eventTypes) { + ATSEvents allEvents = new ATSEvents(); + if (entityIds == null) { + return allEvents; + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + if (windowStart == null) { + windowStart = Long.MIN_VALUE; + } + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + for (String entityId : entityIds) { + EntityId entityID = new EntityId(entityId, entityType); + ATSEntity entity = entities.get(entityID); + if (entity == null) { + continue; + } + ATSEventsOfOneEntity events = new ATSEventsOfOneEntity(); + events.setEntityId(entityId); + events.setEntityType(entityType); + for (ATSEvent event : entity.getEvents()) { + if (events.getEvents().size() >= limit) { + break; + } + if (event.getTimestamp() <= windowStart) { + continue; + } + if (event.getTimestamp() > windowEnd) { + continue; + } + if (eventTypes != null && !eventTypes.contains(event.getEventType())) { + continue; + } + events.addEvent(event); + } + allEvents.addEvent(events); + } + return allEvents; + } + + @Override + public ATSPutErrors put(ATSEntities data) { + ATSPutErrors errors = new ATSPutErrors(); + for (ATSEntity entity : data.getEntities()) { + EntityId entityId = + new EntityId(entity.getEntityId(), entity.getEntityType()); + // store entity info in memory + ATSEntity existingEntity = entities.get(entityId); + if (existingEntity == null) { + existingEntity = new ATSEntity(); + existingEntity.setEntityId(entity.getEntityId()); + existingEntity.setEntityType(entity.getEntityType()); + existingEntity.setStartTime(entity.getStartTime()); + entities.put(entityId, existingEntity); + } + if (entity.getEvents() != null) { + if (existingEntity.getEvents() == null) { + existingEntity.setEvents(entity.getEvents()); + } else { + existingEntity.addEvents(entity.getEvents()); + } + Collections.sort(existingEntity.getEvents()); + } + // check startTime + if (existingEntity.getStartTime() == null) { + if (existingEntity.getEvents() == null + || existingEntity.getEvents().isEmpty()) { + ATSPutError error = new ATSPutError(); + error.setEntityId(entityId.getId()); + error.setEntityType(entityId.getType()); + error.setErrorCode(1); + errors.addError(error); + entities.remove(entityId); + continue; + } else { + existingEntity.setStartTime(entity.getEvents().get(0).getTimestamp()); + } + } + if (entity.getPrimaryFilters() != null) { + if (existingEntity.getPrimaryFilters() == null) { + existingEntity.setPrimaryFilters(entity.getPrimaryFilters()); + } else { + existingEntity.addPrimaryFilters(entity.getPrimaryFilters()); + } + } + if (entity.getOtherInfo() != null) { + if (existingEntity.getOtherInfo() == null) { + existingEntity.setOtherInfo(entity.getOtherInfo()); + } else { + existingEntity.addOtherInfo(entity.getOtherInfo()); + } + } + // relate it to other entities + if (entity.getRelatedEntities() == null) { + continue; + } + for (Map.Entry> partRelatedEntities : entity + .getRelatedEntities().entrySet()) { + if (partRelatedEntities == null) { + continue; + } + for (String idStr : partRelatedEntities.getValue()) { + EntityId relatedEntityId = + new EntityId(idStr, partRelatedEntities.getKey()); + ATSEntity relatedEntity = entities.get(relatedEntityId); + if (relatedEntity != null) { + relatedEntity.addRelatedEntity( + existingEntity.getEntityType(), existingEntity.getEntityId()); + } + } + } + } + return errors; + } + + private static ATSEntity maskFields( + ATSEntity entity, EnumSet fields) { + // Conceal the fields that are not going to be exposed + ATSEntity entityToReturn = new ATSEntity(); + entityToReturn.setEntityId(entity.getEntityId()); + entityToReturn.setEntityType(entity.getEntityType()); + entityToReturn.setStartTime(entity.getStartTime()); + entityToReturn.setEvents(fields.contains(Field.EVENTS) ? + entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ? + Arrays.asList(entity.getEvents().get(0)) : null); + entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ? + entity.getRelatedEntities() : null); + entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ? + entity.getPrimaryFilters() : null); + entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ? + entity.getOtherInfo() : null); + return entityToReturn; + } + + private static boolean matchFilter(Map tags, + NameValuePair filter) { + Object value = tags.get(filter.getName()); + if (value == null) { // doesn't have the filter + return false; + } else if (!value.equals(filter.getValue())) { // doesn't match the filter + return false; + } + return true; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java new file mode 100644 index 0000000000..5825af192b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java @@ -0,0 +1,532 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineReader.Field; + +public class ApplicationTimelineStoreTestUtils { + + private static final Map EMPTY_MAP = Collections.emptyMap(); + private static final Map> EMPTY_REL_ENTITIES = + new HashMap>(); + + protected ApplicationTimelineStore store; + private String entity1; + private String entityType1; + private String entity1b; + private String entity2; + private String entityType2; + private Map primaryFilters; + private Map secondaryFilters; + private Map allFilters; + private Map otherInfo; + private Map> relEntityMap; + private NameValuePair userFilter; + private Collection goodTestingFilters; + private Collection badTestingFilters; + private ATSEvent ev1; + private ATSEvent ev2; + private ATSEvent ev3; + private ATSEvent ev4; + private Map eventInfo; + private List events1; + private List events2; + + /** + * Load test data into the given store + */ + protected void loadTestData() { + ATSEntities atsEntities = new ATSEntities(); + Map primaryFilters = new HashMap(); + primaryFilters.put("user", "username"); + primaryFilters.put("appname", 12345l); + Map secondaryFilters = new HashMap(); + secondaryFilters.put("startTime", 123456l); + secondaryFilters.put("status", "RUNNING"); + Map otherInfo1 = new HashMap(); + otherInfo1.put("info1", "val1"); + otherInfo1.putAll(secondaryFilters); + + String entity1 = "id_1"; + String entityType1 = "type_1"; + String entity1b = "id_2"; + String entity2 = "id_2"; + String entityType2 = "type_2"; + + Map> relatedEntities = + new HashMap>(); + relatedEntities.put(entityType2, Collections.singletonList(entity2)); + + ATSEvent ev3 = createEvent(789l, "launch_event", null); + ATSEvent ev4 = createEvent(-123l, "init_event", null); + List events = new ArrayList(); + events.add(ev3); + events.add(ev4); + atsEntities.setEntities(Collections.singletonList(createEntity(entity2, + entityType2, null, events, null, null, null))); + ATSPutErrors response = store.put(atsEntities); + assertEquals(0, response.getErrors().size()); + + ATSEvent ev1 = createEvent(123l, "start_event", null); + atsEntities.setEntities(Collections.singletonList(createEntity(entity1, + entityType1, 123l, Collections.singletonList(ev1), + relatedEntities, primaryFilters, otherInfo1))); + response = store.put(atsEntities); + assertEquals(0, response.getErrors().size()); + atsEntities.setEntities(Collections.singletonList(createEntity(entity1b, + entityType1, null, Collections.singletonList(ev1), relatedEntities, + primaryFilters, otherInfo1))); + response = store.put(atsEntities); + assertEquals(0, response.getErrors().size()); + + Map eventInfo = new HashMap(); + eventInfo.put("event info 1", "val1"); + ATSEvent ev2 = createEvent(456l, "end_event", eventInfo); + Map otherInfo2 = new HashMap(); + otherInfo2.put("info2", "val2"); + atsEntities.setEntities(Collections.singletonList(createEntity(entity1, + entityType1, null, Collections.singletonList(ev2), null, + primaryFilters, otherInfo2))); + response = store.put(atsEntities); + assertEquals(0, response.getErrors().size()); + atsEntities.setEntities(Collections.singletonList(createEntity(entity1b, + entityType1, 123l, Collections.singletonList(ev2), null, + primaryFilters, otherInfo2))); + response = store.put(atsEntities); + assertEquals(0, response.getErrors().size()); + + atsEntities.setEntities(Collections.singletonList(createEntity( + "badentityid", "badentity", null, null, null, null, otherInfo1))); + response = store.put(atsEntities); + assertEquals(1, response.getErrors().size()); + ATSPutError error = response.getErrors().get(0); + assertEquals("badentityid", error.getEntityId()); + assertEquals("badentity", error.getEntityType()); + assertEquals((Integer) 1, error.getErrorCode()); + } + + /** + * Load veification data + */ + protected void loadVerificationData() throws Exception { + userFilter = new NameValuePair("user", + "username"); + goodTestingFilters = new ArrayList(); + goodTestingFilters.add(new NameValuePair("appname", 12345l)); + goodTestingFilters.add(new NameValuePair("status", "RUNNING")); + badTestingFilters = new ArrayList(); + badTestingFilters.add(new NameValuePair("appname", 12345l)); + badTestingFilters.add(new NameValuePair("status", "FINISHED")); + + primaryFilters = new HashMap(); + primaryFilters.put("user", "username"); + primaryFilters.put("appname", 12345l); + secondaryFilters = new HashMap(); + secondaryFilters.put("startTime", 123456l); + secondaryFilters.put("status", "RUNNING"); + allFilters = new HashMap(); + allFilters.putAll(secondaryFilters); + allFilters.putAll(primaryFilters); + otherInfo = new HashMap(); + otherInfo.put("info1", "val1"); + otherInfo.put("info2", "val2"); + otherInfo.putAll(secondaryFilters); + + entity1 = "id_1"; + entityType1 = "type_1"; + entity1b = "id_2"; + entity2 = "id_2"; + entityType2 = "type_2"; + + ev1 = createEvent(123l, "start_event", null); + + eventInfo = new HashMap(); + eventInfo.put("event info 1", "val1"); + ev2 = createEvent(456l, "end_event", eventInfo); + events1 = new ArrayList(); + events1.add(ev2); + events1.add(ev1); + + relEntityMap = + new HashMap>(); + List ids = new ArrayList(); + ids.add(entity1); + ids.add(entity1b); + relEntityMap.put(entityType1, ids); + + ev3 = createEvent(789l, "launch_event", null); + ev4 = createEvent(-123l, "init_event", null); + events2 = new ArrayList(); + events2.add(ev3); + events2.add(ev4); + } + + public void testGetSingleEntity() { + // test getting entity info + verifyEntityInfo(null, null, null, null, null, null, + store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class))); + + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, store.getEntity(entity1, entityType1, + EnumSet.allOf(Field.class))); + + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, store.getEntity(entity1b, entityType1, + EnumSet.allOf(Field.class))); + + verifyEntityInfo(entity2, entityType2, events2, relEntityMap, EMPTY_MAP, + EMPTY_MAP, store.getEntity(entity2, entityType2, + EnumSet.allOf(Field.class))); + + // test getting single fields + verifyEntityInfo(entity1, entityType1, events1, null, null, null, + store.getEntity(entity1, entityType1, EnumSet.of(Field.EVENTS))); + + verifyEntityInfo(entity1, entityType1, Collections.singletonList(ev2), + null, null, null, store.getEntity(entity1, entityType1, + EnumSet.of(Field.LAST_EVENT_ONLY))); + + verifyEntityInfo(entity1, entityType1, null, null, primaryFilters, null, + store.getEntity(entity1, entityType1, + EnumSet.of(Field.PRIMARY_FILTERS))); + + verifyEntityInfo(entity1, entityType1, null, null, null, otherInfo, + store.getEntity(entity1, entityType1, EnumSet.of(Field.OTHER_INFO))); + + verifyEntityInfo(entity2, entityType2, null, relEntityMap, null, null, + store.getEntity(entity2, entityType2, + EnumSet.of(Field.RELATED_ENTITIES))); + } + + public void testGetEntities() { + // test getting entities + assertEquals("nonzero entities size for nonexistent type", 0, + store.getEntities("type_0", null, null, null, null, null, + null).getEntities().size()); + assertEquals("nonzero entities size for nonexistent type", 0, + store.getEntities("type_3", null, null, null, null, null, + null).getEntities().size()); + assertEquals("nonzero entities size for nonexistent type", 0, + store.getEntities("type_0", null, null, null, userFilter, + null, null).getEntities().size()); + assertEquals("nonzero entities size for nonexistent type", 0, + store.getEntities("type_3", null, null, null, userFilter, + null, null).getEntities().size()); + + List entities = + store.getEntities("type_1", null, null, null, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(2, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + + entities = store.getEntities("type_2", null, null, null, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(1, entities.size()); + verifyEntityInfo(entity2, entityType2, events2, relEntityMap, EMPTY_MAP, + EMPTY_MAP, entities.get(0)); + + entities = store.getEntities("type_1", 1l, null, null, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(1, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + + entities = store.getEntities("type_1", 1l, 0l, null, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(1, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + + entities = store.getEntities("type_1", null, 234l, null, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(0, entities.size()); + + entities = store.getEntities("type_1", null, 123l, null, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(0, entities.size()); + + entities = store.getEntities("type_1", null, 234l, 345l, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(0, entities.size()); + + entities = store.getEntities("type_1", null, null, 345l, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(2, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + + entities = store.getEntities("type_1", null, null, 123l, null, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(2, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + } + + public void testGetEntitiesWithPrimaryFilters() { + // test using primary filter + assertEquals("nonzero entities size for primary filter", 0, + store.getEntities("type_1", null, null, null, + new NameValuePair("none", "none"), null, + EnumSet.allOf(Field.class)).getEntities().size()); + assertEquals("nonzero entities size for primary filter", 0, + store.getEntities("type_2", null, null, null, + new NameValuePair("none", "none"), null, + EnumSet.allOf(Field.class)).getEntities().size()); + assertEquals("nonzero entities size for primary filter", 0, + store.getEntities("type_3", null, null, null, + new NameValuePair("none", "none"), null, + EnumSet.allOf(Field.class)).getEntities().size()); + + List entities = store.getEntities("type_1", null, null, null, + userFilter, null, EnumSet.allOf(Field.class)).getEntities(); + assertEquals(2, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + + entities = store.getEntities("type_2", null, null, null, userFilter, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(0, entities.size()); + + entities = store.getEntities("type_1", 1l, null, null, userFilter, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(1, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + + entities = store.getEntities("type_1", 1l, 0l, null, userFilter, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(1, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + + entities = store.getEntities("type_1", null, 234l, null, userFilter, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(0, entities.size()); + + entities = store.getEntities("type_1", null, 234l, 345l, userFilter, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(0, entities.size()); + + entities = store.getEntities("type_1", null, null, 345l, userFilter, null, + EnumSet.allOf(Field.class)).getEntities(); + assertEquals(2, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + } + + public void testGetEntitiesWithSecondaryFilters() { + // test using secondary filter + List entities = store.getEntities("type_1", null, null, null, + null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities(); + assertEquals(2, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + + entities = store.getEntities("type_1", null, null, null, userFilter, + goodTestingFilters, EnumSet.allOf(Field.class)).getEntities(); + assertEquals(2, entities.size()); + verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + + entities = store.getEntities("type_1", null, null, null, null, + badTestingFilters, EnumSet.allOf(Field.class)).getEntities(); + assertEquals(0, entities.size()); + + entities = store.getEntities("type_1", null, null, null, userFilter, + badTestingFilters, EnumSet.allOf(Field.class)).getEntities(); + assertEquals(0, entities.size()); + } + + public void testGetEvents() { + // test getting entity timelines + SortedSet sortedSet = new TreeSet(); + sortedSet.add(entity1); + List timelines = + store.getEntityTimelines(entityType1, sortedSet, null, null, + null, null).getAllEvents(); + assertEquals(1, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2, ev1); + + sortedSet.add(entity1b); + timelines = store.getEntityTimelines(entityType1, sortedSet, null, + null, null, null).getAllEvents(); + assertEquals(2, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2, ev1); + verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2, ev1); + + timelines = store.getEntityTimelines(entityType1, sortedSet, 1l, + null, null, null).getAllEvents(); + assertEquals(2, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2); + verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2); + + timelines = store.getEntityTimelines(entityType1, sortedSet, null, + 345l, null, null).getAllEvents(); + assertEquals(2, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2); + verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2); + + timelines = store.getEntityTimelines(entityType1, sortedSet, null, + 123l, null, null).getAllEvents(); + assertEquals(2, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2); + verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2); + + timelines = store.getEntityTimelines(entityType1, sortedSet, null, + null, 345l, null).getAllEvents(); + assertEquals(2, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev1); + verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev1); + + timelines = store.getEntityTimelines(entityType1, sortedSet, null, + null, 123l, null).getAllEvents(); + assertEquals(2, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev1); + verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev1); + + timelines = store.getEntityTimelines(entityType1, sortedSet, null, + null, null, Collections.singleton("end_event")).getAllEvents(); + assertEquals(2, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2); + verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2); + + sortedSet.add(entity2); + timelines = store.getEntityTimelines(entityType2, sortedSet, null, + null, null, null).getAllEvents(); + assertEquals(1, timelines.size()); + verifyEntityTimeline(timelines.get(0), entity2, entityType2, ev3, ev4); + } + + /** + * Verify a single entity + */ + private static void verifyEntityInfo(String entity, String entityType, + List events, Map> relatedEntities, + Map primaryFilters, Map otherInfo, + ATSEntity retrievedEntityInfo) { + if (entity == null) { + assertNull(retrievedEntityInfo); + return; + } + assertEquals(entity, retrievedEntityInfo.getEntityId()); + assertEquals(entityType, retrievedEntityInfo.getEntityType()); + if (events == null) + assertNull(retrievedEntityInfo.getEvents()); + else + assertEquals(events, retrievedEntityInfo.getEvents()); + if (relatedEntities == null) + assertNull(retrievedEntityInfo.getRelatedEntities()); + else + assertEquals(relatedEntities, retrievedEntityInfo.getRelatedEntities()); + if (primaryFilters == null) + assertNull(retrievedEntityInfo.getPrimaryFilters()); + else + assertTrue(primaryFilters.equals( + retrievedEntityInfo.getPrimaryFilters())); + if (otherInfo == null) + assertNull(retrievedEntityInfo.getOtherInfo()); + else + assertTrue(otherInfo.equals(retrievedEntityInfo.getOtherInfo())); + } + + /** + * Verify timeline events + */ + private static void verifyEntityTimeline( + ATSEventsOfOneEntity retrievedEvents, String entity, String entityType, + ATSEvent... actualEvents) { + assertEquals(entity, retrievedEvents.getEntityId()); + assertEquals(entityType, retrievedEvents.getEntityType()); + assertEquals(actualEvents.length, retrievedEvents.getEvents().size()); + for (int i = 0; i < actualEvents.length; i++) { + assertEquals(actualEvents[i], retrievedEvents.getEvents().get(i)); + } + } + + /** + * Create a test entity + */ + private static ATSEntity createEntity(String entity, String entityType, + Long startTime, List events, + Map> relatedEntities, + Map primaryFilters, Map otherInfo) { + ATSEntity atsEntity = new ATSEntity(); + atsEntity.setEntityId(entity); + atsEntity.setEntityType(entityType); + atsEntity.setStartTime(startTime); + atsEntity.setEvents(events); + if (relatedEntities != null) + for (Entry> e : relatedEntities.entrySet()) + for (String v : e.getValue()) + atsEntity.addRelatedEntity(e.getKey(), v); + else + atsEntity.setRelatedEntities(null); + atsEntity.setPrimaryFilters(primaryFilters); + atsEntity.setOtherInfo(otherInfo); + return atsEntity; + } + + /** + * Create a test event + */ + private static ATSEvent createEvent(long timestamp, String type, Map info) { + ATSEvent event = new ATSEvent(); + event.setTimestamp(timestamp); + event.setEventType(type); + event.setEventInfo(info); + return event; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java new file mode 100644 index 0000000000..aa88b74a90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestMemoryApplicationTimelineStore + extends ApplicationTimelineStoreTestUtils { + + @Before + public void setup() throws Exception { + store = new MemoryApplicationTimelineStore(); + store.init(new YarnConfiguration()); + store.start(); + loadTestData(); + loadVerificationData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + } + + public ApplicationTimelineStore getApplicationTimelineStore() { + return store; + } + + @Test + public void testGetSingleEntity() { + super.testGetSingleEntity(); + } + + @Test + public void testGetEntities() { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithPrimaryFilters() { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() { + super.testGetEvents(); + } + +}