diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b866b2f88b..9df803b1ef 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -105,6 +105,8 @@ Release 2.6.0 - UNRELEASED YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen) + YARN-2302. Refactor TimelineWebServices. (Zhijie Shen via junping_du) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index ce05d50398..29bbd21733 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer; @@ -59,12 +60,12 @@ public class ApplicationHistoryServer extends CompositeService { private static final Log LOG = LogFactory .getLog(ApplicationHistoryServer.class); - protected ApplicationHistoryClientService ahsClientService; - protected ApplicationHistoryManager historyManager; - protected TimelineStore timelineStore; - protected TimelineDelegationTokenSecretManagerService secretManagerService; - protected TimelineACLsManager timelineACLsManager; - protected WebApp webApp; + private ApplicationHistoryClientService ahsClientService; + private ApplicationHistoryManager historyManager; + private TimelineStore timelineStore; + private TimelineDelegationTokenSecretManagerService secretManagerService; + private TimelineDataManager timelineDataManager; + private WebApp webApp; public ApplicationHistoryServer() { super(ApplicationHistoryServer.class.getName()); @@ -72,15 +73,18 @@ public ApplicationHistoryServer() { @Override protected void serviceInit(Configuration conf) throws Exception { - historyManager = createApplicationHistory(); - ahsClientService = createApplicationHistoryClientService(historyManager); - addService(ahsClientService); - addService((Service) historyManager); + // init timeline services first timelineStore = createTimelineStore(conf); addIfService(timelineStore); secretManagerService = createTimelineDelegationTokenSecretManagerService(conf); addService(secretManagerService); - timelineACLsManager = createTimelineACLsManager(conf); + timelineDataManager = createTimelineDataManager(conf); + + // init generic history service afterwards + historyManager = createApplicationHistoryManager(conf); + ahsClientService = createApplicationHistoryClientService(historyManager); + addService(ahsClientService); + addService((Service) historyManager); DefaultMetricsSystem.initialize("ApplicationHistoryServer"); JvmMetrics.initSingleton("ApplicationHistoryServer", null); @@ -111,21 +115,22 @@ protected void serviceStop() throws Exception { @Private @VisibleForTesting - public ApplicationHistoryClientService getClientService() { + ApplicationHistoryClientService getClientService() { return this.ahsClientService; } - protected ApplicationHistoryClientService - createApplicationHistoryClientService( - ApplicationHistoryManager historyManager) { - return new ApplicationHistoryClientService(historyManager); + /** + * @return ApplicationTimelineStore + */ + @Private + @VisibleForTesting + public TimelineStore getTimelineStore() { + return timelineStore; } - protected ApplicationHistoryManager createApplicationHistory() { - return new ApplicationHistoryManagerImpl(); - } - - protected ApplicationHistoryManager getApplicationHistory() { + @Private + @VisibleForTesting + ApplicationHistoryManager getApplicationHistoryManager() { return this.historyManager; } @@ -154,28 +159,35 @@ public static void main(String[] args) { launchAppHistoryServer(args); } - protected ApplicationHistoryManager createApplicationHistoryManager( + private ApplicationHistoryClientService + createApplicationHistoryClientService( + ApplicationHistoryManager historyManager) { + return new ApplicationHistoryClientService(historyManager); + } + + private ApplicationHistoryManager createApplicationHistoryManager( Configuration conf) { return new ApplicationHistoryManagerImpl(); } - protected TimelineStore createTimelineStore( + private TimelineStore createTimelineStore( Configuration conf) { return ReflectionUtils.newInstance(conf.getClass( YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class, TimelineStore.class), conf); } - protected TimelineDelegationTokenSecretManagerService + private TimelineDelegationTokenSecretManagerService createTimelineDelegationTokenSecretManagerService(Configuration conf) { return new TimelineDelegationTokenSecretManagerService(); } - protected TimelineACLsManager createTimelineACLsManager(Configuration conf) { - return new TimelineACLsManager(conf); + private TimelineDataManager createTimelineDataManager(Configuration conf) { + return new TimelineDataManager( + timelineStore, new TimelineACLsManager(conf)); } - protected void startWebApp() { + private void startWebApp() { Configuration conf = getConfig(); // Always load pseudo authentication filter to parse "user.name" in an URL // to identify a HTTP request's user in insecure mode. @@ -199,9 +211,8 @@ protected void startWebApp() { try { AHSWebApp ahsWebApp = AHSWebApp.getInstance(); ahsWebApp.setApplicationHistoryManager(historyManager); - ahsWebApp.setTimelineStore(timelineStore); ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService); - ahsWebApp.setTimelineACLsManager(timelineACLsManager); + ahsWebApp.setTimelineDataManager(timelineDataManager); webApp = WebApps .$for("applicationhistory", ApplicationHistoryClientService.class, @@ -213,14 +224,6 @@ protected void startWebApp() { throw new YarnRuntimeException(msg, e); } } - /** - * @return ApplicationTimelineStore - */ - @Private - @VisibleForTesting - public TimelineStore getTimelineStore() { - return timelineStore; - } private void doSecureLogin(Configuration conf) throws IOException { InetSocketAddress socAddr = getBindAddress(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java index 9901eeb425..68541d8352 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java @@ -22,8 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.server.api.ApplicationContext; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager; -import org.apache.hadoop.yarn.server.timeline.TimelineStore; -import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService; import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; @@ -36,9 +35,8 @@ public class AHSWebApp extends WebApp implements YarnWebParams { private ApplicationHistoryManager applicationHistoryManager; - private TimelineStore timelineStore; private TimelineDelegationTokenSecretManagerService secretManagerService; - private TimelineACLsManager timelineACLsManager; + private TimelineDataManager timelineDataManager; private static AHSWebApp instance = null; @@ -68,14 +66,6 @@ public void setApplicationHistoryManager( this.applicationHistoryManager = applicationHistoryManager; } - public TimelineStore getTimelineStore() { - return timelineStore; - } - - public void setTimelineStore(TimelineStore timelineStore) { - this.timelineStore = timelineStore; - } - public TimelineDelegationTokenSecretManagerService getTimelineDelegationTokenSecretManagerService() { return secretManagerService; @@ -86,12 +76,12 @@ public void setTimelineDelegationTokenSecretManagerService( this.secretManagerService = secretManagerService; } - public TimelineACLsManager getTimelineACLsManager() { - return timelineACLsManager; + public TimelineDataManager getTimelineDataManager() { + return timelineDataManager; } - public void setTimelineACLsManager(TimelineACLsManager timelineACLsManager) { - this.timelineACLsManager = timelineACLsManager; + public void setTimelineDataManager(TimelineDataManager timelineDataManager) { + this.timelineDataManager = timelineDataManager; } @Override @@ -101,10 +91,9 @@ public void setup() { bind(TimelineWebServices.class); bind(GenericExceptionHandler.class); bind(ApplicationContext.class).toInstance(applicationHistoryManager); - bind(TimelineStore.class).toInstance(timelineStore); bind(TimelineDelegationTokenSecretManagerService.class).toInstance( secretManagerService); - bind(TimelineACLsManager.class).toInstance(timelineACLsManager); + bind(TimelineDataManager.class).toInstance(timelineDataManager); route("/", AHSController.class); route(pajoin("/apps", APP_STATE), AHSController.class); route(pajoin("/app", APPLICATION_ID), AHSController.class, "app"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java new file mode 100644 index 0000000000..e68e860418 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.SortedSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +/** + * The class wrap over the timeline store and the ACLs manager. It does some non + * trivial manipulation of the timeline data before putting or after getting it + * from the timeline store, and checks the user's access to it. + * + */ +public class TimelineDataManager { + + private static final Log LOG = LogFactory.getLog(TimelineDataManager.class); + + private TimelineStore store; + private TimelineACLsManager timelineACLsManager; + + public TimelineDataManager(TimelineStore store, + TimelineACLsManager timelineACLsManager) { + this.store = store; + this.timelineACLsManager = timelineACLsManager; + } + + /** + * Get the timeline entities that the given user have access to. The meaning + * of each argument has been documented with + * {@link TimelineReader#getEntities}. + * + * @see TimelineReader#getEntities + */ + public TimelineEntities getEntities( + String entityType, + NameValuePair primaryFilter, + Collection secondaryFilter, + Long windowStart, + Long windowEnd, + String fromId, + Long fromTs, + Long limit, + EnumSet fields, + UserGroupInformation callerUGI) throws YarnException, IOException { + TimelineEntities entities = null; + boolean modified = extendFields(fields); + entities = store.getEntities( + entityType, + limit, + windowStart, + windowEnd, + fromId, + fromTs, + primaryFilter, + secondaryFilter, + fields); + if (entities != null) { + Iterator entitiesItr = + entities.getEntities().iterator(); + while (entitiesItr.hasNext()) { + TimelineEntity entity = entitiesItr.next(); + try { + // check ACLs + if (!timelineACLsManager.checkAccess(callerUGI, entity)) { + entitiesItr.remove(); + } else { + // clean up system data + if (modified) { + entity.setPrimaryFilters(null); + } else { + cleanupOwnerInfo(entity); + } + } + } catch (YarnException e) { + LOG.error("Error when verifying access for user " + callerUGI + + " on the events of the timeline entity " + + new EntityIdentifier(entity.getEntityId(), + entity.getEntityType()), e); + entitiesItr.remove(); + } + } + } + if (entities == null) { + return new TimelineEntities(); + } + return entities; + } + + /** + * Get the single timeline entity that the given user has access to. The + * meaning of each argument has been documented with + * {@link TimelineReader#getEntity}. + * + * @see TimelineReader#getEntity + */ + public TimelineEntity getEntity( + String entityType, + String entityId, + EnumSet fields, + UserGroupInformation callerUGI) throws YarnException, IOException { + TimelineEntity entity = null; + boolean modified = extendFields(fields); + entity = + store.getEntity(entityId, entityType, fields); + if (entity != null) { + // check ACLs + if (!timelineACLsManager.checkAccess(callerUGI, entity)) { + entity = null; + } else { + // clean up the system data + if (modified) { + entity.setPrimaryFilters(null); + } else { + cleanupOwnerInfo(entity); + } + } + } + return entity; + } + + /** + * Get the events whose entities the given user has access to. The meaning of + * each argument has been documented with + * {@link TimelineReader#getEntityTimelines}. + * + * @see TimelineReader#getEntityTimelines + */ + public TimelineEvents getEvents( + String entityType, + SortedSet entityIds, + SortedSet eventTypes, + Long windowStart, + Long windowEnd, + Long limit, + UserGroupInformation callerUGI) throws YarnException, IOException { + TimelineEvents events = null; + events = store.getEntityTimelines( + entityType, + entityIds, + limit, + windowStart, + windowEnd, + eventTypes); + if (events != null) { + Iterator eventsItr = + events.getAllEvents().iterator(); + while (eventsItr.hasNext()) { + TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next(); + try { + TimelineEntity entity = store.getEntity( + eventsOfOneEntity.getEntityId(), + eventsOfOneEntity.getEntityType(), + EnumSet.of(Field.PRIMARY_FILTERS)); + // check ACLs + if (!timelineACLsManager.checkAccess(callerUGI, entity)) { + eventsItr.remove(); + } + } catch (Exception e) { + LOG.error("Error when verifying access for user " + callerUGI + + " on the events of the timeline entity " + + new EntityIdentifier(eventsOfOneEntity.getEntityId(), + eventsOfOneEntity.getEntityType()), e); + eventsItr.remove(); + } + } + } + if (events == null) { + return new TimelineEvents(); + } + return events; + } + + /** + * Store the timeline entities into the store and set the owner of them to the + * given user. + */ + public TimelinePutResponse postEntities( + TimelineEntities entities, + UserGroupInformation callerUGI) throws YarnException, IOException { + if (entities == null) { + return new TimelinePutResponse(); + } + List entityIDs = new ArrayList(); + TimelineEntities entitiesToPut = new TimelineEntities(); + List errors = + new ArrayList(); + for (TimelineEntity entity : entities.getEntities()) { + EntityIdentifier entityID = + new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); + + // check if there is existing entity + TimelineEntity existingEntity = null; + try { + existingEntity = + store.getEntity(entityID.getId(), entityID.getType(), + EnumSet.of(Field.PRIMARY_FILTERS)); + if (existingEntity != null + && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) { + throw new YarnException("The timeline entity " + entityID + + " was not put by " + callerUGI + " before"); + } + } catch (Exception e) { + // Skip the entity which already exists and was put by others + LOG.error("Skip the timeline entity: " + entityID + ", because " + + e.getMessage()); + TimelinePutResponse.TimelinePutError error = + new TimelinePutResponse.TimelinePutError(); + error.setEntityId(entityID.getId()); + error.setEntityType(entityID.getType()); + error.setErrorCode( + TimelinePutResponse.TimelinePutError.ACCESS_DENIED); + errors.add(error); + continue; + } + + // inject owner information for the access check if this is the first + // time to post the entity, in case it's the admin who is updating + // the timeline data. + try { + if (existingEntity == null) { + injectOwnerInfo(entity, callerUGI.getShortUserName()); + } + } catch (YarnException e) { + // Skip the entity which messes up the primary filter and record the + // error + LOG.error("Skip the timeline entity: " + entityID + ", because " + + e.getMessage()); + TimelinePutResponse.TimelinePutError error = + new TimelinePutResponse.TimelinePutError(); + error.setEntityId(entityID.getId()); + error.setEntityType(entityID.getType()); + error.setErrorCode( + TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT); + errors.add(error); + continue; + } + + entityIDs.add(entityID); + entitiesToPut.addEntity(entity); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing the entity " + entityID + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs)); + } + TimelinePutResponse response = store.put(entitiesToPut); + // add the errors of timeline system filter key conflict + response.addErrors(errors); + return response; + } + + private static boolean extendFields(EnumSet fieldEnums) { + boolean modified = false; + if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) { + fieldEnums.add(Field.PRIMARY_FILTERS); + modified = true; + } + return modified; + } + + private static void injectOwnerInfo(TimelineEntity timelineEntity, + String owner) throws YarnException { + if (timelineEntity.getPrimaryFilters() != null && + timelineEntity.getPrimaryFilters().containsKey( + TimelineStore.SystemFilter.ENTITY_OWNER.toString())) { + throw new YarnException( + "User should not use the timeline system filter key: " + + TimelineStore.SystemFilter.ENTITY_OWNER); + } + timelineEntity.addPrimaryFilter( + TimelineStore.SystemFilter.ENTITY_OWNER + .toString(), owner); + } + + private static void cleanupOwnerInfo(TimelineEntity timelineEntity) { + if (timelineEntity.getPrimaryFilters() != null) { + timelineEntity.getPrimaryFilters().remove( + TimelineStore.SystemFilter.ENTITY_OWNER.toString()); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java index ad739c94c6..c5e6d49c8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java @@ -18,14 +18,10 @@ package org.apache.hadoop.yarn.server.timeline.webapp; -import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER; - -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.SortedSet; @@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.EntityIdentifier; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timeline.TimelineStore; -import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -80,14 +73,11 @@ public class TimelineWebServices { private static final Log LOG = LogFactory.getLog(TimelineWebServices.class); - private TimelineStore store; - private TimelineACLsManager timelineACLsManager; + private TimelineDataManager timelineDataManager; @Inject - public TimelineWebServices(TimelineStore store, - TimelineACLsManager timelineACLsManager) { - this.store = store; - this.timelineACLsManager = timelineACLsManager; + public TimelineWebServices(TimelineDataManager timelineDataManager) { + this.timelineDataManager = timelineDataManager; } @XmlRootElement(name = "about") @@ -148,61 +138,28 @@ public TimelineEntities getEntities( @QueryParam("limit") String limit, @QueryParam("fields") String fields) { init(res); - TimelineEntities entities = null; try { - EnumSet fieldEnums = parseFieldsStr(fields, ","); - boolean modified = extendFields(fieldEnums); - UserGroupInformation callerUGI = getUser(req); - entities = store.getEntities( + return timelineDataManager.getEntities( parseStr(entityType), - parseLongStr(limit), + parsePairStr(primaryFilter, ":"), + parsePairsStr(secondaryFilter, ",", ":"), parseLongStr(windowStart), parseLongStr(windowEnd), parseStr(fromId), parseLongStr(fromTs), - parsePairStr(primaryFilter, ":"), - parsePairsStr(secondaryFilter, ",", ":"), - fieldEnums); - if (entities != null) { - Iterator entitiesItr = - entities.getEntities().iterator(); - while (entitiesItr.hasNext()) { - TimelineEntity entity = entitiesItr.next(); - try { - // check ACLs - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - entitiesItr.remove(); - } else { - // clean up system data - if (modified) { - entity.setPrimaryFilters(null); - } else { - cleanupOwnerInfo(entity); - } - } - } catch (YarnException e) { - LOG.error("Error when verifying access for user " + callerUGI - + " on the events of the timeline entity " - + new EntityIdentifier(entity.getEntityId(), - entity.getEntityType()), e); - entitiesItr.remove(); - } - } - } + parseLongStr(limit), + parseFieldsStr(fields, ","), + getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( "windowStart, windowEnd or limit is not a numeric value."); } catch (IllegalArgumentException e) { throw new BadRequestException("requested invalid field."); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error getting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } - if (entities == null) { - return new TimelineEntities(); - } - return entities; } /** @@ -220,33 +177,15 @@ public TimelineEntity getEntity( init(res); TimelineEntity entity = null; try { - EnumSet fieldEnums = parseFieldsStr(fields, ","); - boolean modified = extendFields(fieldEnums); - entity = - store.getEntity(parseStr(entityId), parseStr(entityType), - fieldEnums); - if (entity != null) { - // check ACLs - UserGroupInformation callerUGI = getUser(req); - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - entity = null; - } else { - // clean up the system data - if (modified) { - entity.setPrimaryFilters(null); - } else { - cleanupOwnerInfo(entity); - } - } - } + entity = timelineDataManager.getEntity( + parseStr(entityType), + parseStr(entityId), + parseFieldsStr(fields, ","), + getUser(req)); } catch (IllegalArgumentException e) { throw new BadRequestException( "requested invalid field."); - } catch (IOException e) { - LOG.error("Error getting entity", e); - throw new WebApplicationException(e, - Response.Status.INTERNAL_SERVER_ERROR); - } catch (YarnException e) { + } catch (Exception e) { LOG.error("Error getting entity", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -275,51 +214,23 @@ public TimelineEvents getEvents( @QueryParam("windowEnd") String windowEnd, @QueryParam("limit") String limit) { init(res); - TimelineEvents events = null; try { - UserGroupInformation callerUGI = getUser(req); - events = store.getEntityTimelines( + return timelineDataManager.getEvents( parseStr(entityType), parseArrayStr(entityId, ","), - parseLongStr(limit), + parseArrayStr(eventType, ","), parseLongStr(windowStart), parseLongStr(windowEnd), - parseArrayStr(eventType, ",")); - if (events != null) { - Iterator eventsItr = - events.getAllEvents().iterator(); - while (eventsItr.hasNext()) { - TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next(); - try { - TimelineEntity entity = store.getEntity( - eventsOfOneEntity.getEntityId(), - eventsOfOneEntity.getEntityType(), - EnumSet.of(Field.PRIMARY_FILTERS)); - // check ACLs - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - eventsItr.remove(); - } - } catch (Exception e) { - LOG.error("Error when verifying access for user " + callerUGI - + " on the events of the timeline entity " - + new EntityIdentifier(eventsOfOneEntity.getEntityId(), - eventsOfOneEntity.getEntityType()), e); - eventsItr.remove(); - } - } - } + parseLongStr(limit), + getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( "windowStart, windowEnd or limit is not a numeric value."); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error getting entity timelines", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } - if (events == null) { - return new TimelineEvents(); - } - return events; } /** @@ -333,9 +244,6 @@ public TimelinePutResponse postEntities( @Context HttpServletResponse res, TimelineEntities entities) { init(res); - if (entities == null) { - return new TimelinePutResponse(); - } UserGroupInformation callerUGI = getUser(req); if (callerUGI == null) { String msg = "The owner of the posted timeline entities is not set"; @@ -343,76 +251,8 @@ public TimelinePutResponse postEntities( throw new ForbiddenException(msg); } try { - List entityIDs = new ArrayList(); - TimelineEntities entitiesToPut = new TimelineEntities(); - List errors = - new ArrayList(); - for (TimelineEntity entity : entities.getEntities()) { - EntityIdentifier entityID = - new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); - - // check if there is existing entity - TimelineEntity existingEntity = null; - try { - existingEntity = - store.getEntity(entityID.getId(), entityID.getType(), - EnumSet.of(Field.PRIMARY_FILTERS)); - if (existingEntity != null - && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) { - throw new YarnException("The timeline entity " + entityID - + " was not put by " + callerUGI + " before"); - } - } catch (Exception e) { - // Skip the entity which already exists and was put by others - LOG.warn("Skip the timeline entity: " + entityID + ", because " - + e.getMessage()); - TimelinePutResponse.TimelinePutError error = - new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); - error.setErrorCode( - TimelinePutResponse.TimelinePutError.ACCESS_DENIED); - errors.add(error); - continue; - } - - // inject owner information for the access check if this is the first - // time to post the entity, in case it's the admin who is updating - // the timeline data. - try { - if (existingEntity == null) { - injectOwnerInfo(entity, callerUGI.getShortUserName()); - } - } catch (YarnException e) { - // Skip the entity which messes up the primary filter and record the - // error - LOG.warn("Skip the timeline entity: " + entityID + ", because " - + e.getMessage()); - TimelinePutResponse.TimelinePutError error = - new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); - error.setErrorCode( - TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT); - errors.add(error); - continue; - } - - entityIDs.add(entityID); - entitiesToPut.addEntity(entity); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing the entity " + entityID + ", JSON-style content: " - + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs)); - } - TimelinePutResponse response = store.put(entitiesToPut); - // add the errors of timeline system filter key conflict - response.addErrors(errors); - return response; - } catch (IOException e) { + return timelineDataManager.postEntities(entities, callerUGI); + } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -423,6 +263,15 @@ private void init(HttpServletResponse response) { response.setContentType(null); } + private static UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUGI; + } + private static SortedSet parseArrayStr(String str, String delimiter) { if (str == null) { return null; @@ -495,14 +344,6 @@ private static EnumSet parseFieldsStr(String str, String delimiter) { } } - private static boolean extendFields(EnumSet fieldEnums) { - boolean modified = false; - if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) { - fieldEnums.add(Field.PRIMARY_FILTERS); - modified = true; - } - return modified; - } private static Long parseLongStr(String str) { return str == null ? null : Long.parseLong(str.trim()); } @@ -511,34 +352,4 @@ private static String parseStr(String str) { return str == null ? null : str.trim(); } - private static UserGroupInformation getUser(HttpServletRequest req) { - String remoteUser = req.getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } - return callerUGI; - } - - private static void injectOwnerInfo(TimelineEntity timelineEntity, - String owner) throws YarnException { - if (timelineEntity.getPrimaryFilters() != null && - timelineEntity.getPrimaryFilters().containsKey( - TimelineStore.SystemFilter.ENTITY_OWNER.toString())) { - throw new YarnException( - "User should not use the timeline system filter key: " - + TimelineStore.SystemFilter.ENTITY_OWNER); - } - timelineEntity.addPrimaryFilter( - TimelineStore.SystemFilter.ENTITY_OWNER - .toString(), owner); - } - - private static void cleanupOwnerInfo(TimelineEntity timelineEntity) { - if (timelineEntity.getPrimaryFilters() != null) { - timelineEntity.getPrimaryFilters().remove( - TimelineStore.SystemFilter.ENTITY_OWNER.toString()); - } - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java index 3f3c08a55d..7b5b74b675 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java @@ -69,7 +69,7 @@ public void setup() { historyServer.init(config); historyServer.start(); store = - ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory()) + ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager()) .getHistoryStore(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java index 2096bbb206..549cfe1302 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AdminACLsManager; import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter; @@ -89,14 +90,15 @@ protected void configureServlets() { } catch (Exception e) { Assert.fail(); } - bind(TimelineStore.class).toInstance(store); Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false); timelineACLsManager = new TimelineACLsManager(conf); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); adminACLsManager = new AdminACLsManager(conf); - bind(TimelineACLsManager.class).toInstance(timelineACLsManager); + TimelineDataManager timelineDataManager = + new TimelineDataManager(store, timelineACLsManager); + bind(TimelineDataManager.class).toInstance(timelineDataManager); serve("/*").with(GuiceContainer.class); TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter(); FilterConfig filterConfig = mock(FilterConfig.class);