diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java index 163bd5c696..cf19328f67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java @@ -140,7 +140,15 @@ public void setCluster(String cluster) { } public Date getDate() { - return (Date)getInfo().get(DATE_INFO_KEY); + Object date = getInfo().get(DATE_INFO_KEY); + if (date != null) { + if (date instanceof Long) { + return new Date((Long)date); + } else if (date instanceof Date) { + return (Date)date; + } + } + return null; } public void setDate(long time) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 83062f39c6..d82a40212c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -19,12 +19,18 @@ package org.apache.hadoop.yarn.server.timelineservice.reader; import java.io.IOException; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Collections; +import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; @@ -54,6 +60,7 @@ import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.NotFoundException; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Singleton; /** REST end point for Timeline Reader */ @@ -70,11 +77,96 @@ public class TimelineReaderWebServices { private static final String COMMA_DELIMITER = ","; private static final String COLON_DELIMITER = ":"; private static final String QUERY_STRING_SEP = "?"; + private static final String RANGE_DELIMITER = "-"; + private static final String DATE_PATTERN = "yyyyMMdd"; + + @VisibleForTesting + static ThreadLocal DATE_FORMAT = new ThreadLocal() { + @Override + protected DateFormat initialValue() { + SimpleDateFormat format = + new SimpleDateFormat(DATE_PATTERN, Locale.ENGLISH); + format.setTimeZone(TimeZone.getTimeZone("GMT")); + format.setLenient(false); + return format; + } + }; private void init(HttpServletResponse response) { response.setContentType(null); } + private static class DateRange { + Long dateStart; + Long dateEnd; + private DateRange(Long start, Long end) { + this.dateStart = start; + this.dateEnd = end; + } + } + + private static long parseDate(String strDate) throws ParseException { + Date date = DATE_FORMAT.get().parse(strDate); + return date.getTime(); + } + + /** + * Parses date range which can be a single date or in the format + * "[startdate]-[enddate]" where either of start or end date may not exist. + * @param dateRange + * @return a {@link DateRange} object. + * @throws IllegalArgumentException + */ + private static DateRange parseDateRange(String dateRange) + throws IllegalArgumentException { + if (dateRange == null || dateRange.isEmpty()) { + return new DateRange(null, null); + } + // Split date range around "-" fetching two components indicating start and + // end date. + String[] dates = dateRange.split(RANGE_DELIMITER, 2); + Long start = null; + Long end = null; + try { + String startDate = dates[0].trim(); + if (!startDate.isEmpty()) { + // Start date is not in yyyyMMdd format. + if (startDate.length() != DATE_PATTERN.length()) { + throw new IllegalArgumentException("Invalid date range " + dateRange); + } + // Parse start date which exists before "-" in date range. + // If "-" does not exist in date range, this effectively + // gives single date. + start = parseDate(startDate); + } + if (dates.length > 1) { + String endDate = dates[1].trim(); + if (!endDate.isEmpty()) { + // End date is not in yyyyMMdd format. + if (endDate.length() != DATE_PATTERN.length()) { + throw new IllegalArgumentException( + "Invalid date range " + dateRange); + } + // Parse end date which exists after "-" in date range. + end = parseDate(endDate); + } + } else { + // Its a single date(without "-" in date range), so set + // end equal to start. + end = start; + } + if (start != null && end != null) { + if (start > end) { + throw new IllegalArgumentException("Invalid date range " + dateRange); + } + } + return new DateRange(start, end); + } catch (ParseException e) { + // Date could not be parsed. + throw new IllegalArgumentException("Invalid date range " + dateRange); + } + } + private static Set parseValuesStr(String str, String delimiter) { if (str == null || str.isEmpty()) { return null; @@ -205,7 +297,8 @@ private static void handleException(Exception e, String url, long startTime, if (e instanceof NumberFormatException) { throw new BadRequestException(invalidNumMsg + " is not a numeric value."); } else if (e instanceof IllegalArgumentException) { - throw new BadRequestException("Requested Invalid Field."); + throw new BadRequestException(e.getMessage() == null ? + "Requested Invalid Field." : e.getMessage()); } else { LOG.error("Error while processing REST request", e); throw new WebApplicationException(e, @@ -514,8 +607,20 @@ public Set getFlowRuns( } /** - * Return a list of flows for a given cluster id. Cluster ID is not - * provided by client so default cluster ID has to be taken. + * Return a list of flows. Cluster ID is not provided by client so default + * cluster ID has to be taken. daterange, if specified is given as + * "[startdate]-[enddate]"(i.e. start and end date separated by -) or + * single date. Dates are interpreted in yyyyMMdd format and are assumed to + * be in GMT. If a single date is specified, all flows active on that date are + * returned. If both startdate and enddate is given, all flows active between + * start and end date will be returned. If only startdate is given, flows + * active on and after startdate are returned. If only enddate is given, flows + * active on and before enddate are returned. + * For example : + * "daterange=20150711" returns flows active on 20150711. + * "daterange=20150711-20150714" returns flows active between these 2 dates. + * "daterange=20150711-" returns flows active on and after 20150711. + * "daterange=-20150711" returns flows active on and before 20150711. */ @GET @Path("/flows/") @@ -524,12 +629,25 @@ public Set getFlows( @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("limit") String limit, + @QueryParam("daterange") String dateRange, @QueryParam("fields") String fields) { - return getFlows(req, res, null, limit, fields); + return getFlows(req, res, null, limit, dateRange, fields); } /** - * Return a list of flows for a given cluster id. + * Return a list of flows for a given cluster id. daterange, if specified is + * given as "[startdate]-[enddate]"(i.e. start and end date separated by -) or + * single date. Dates are interpreted in yyyyMMdd format and are assumed to + * be in GMT. If a single date is specified, all flows active on that date are + * returned. If both startdate and enddate is given, all flows active between + * start and end date will be returned. If only startdate is given, flows + * active on and after startdate are returned. If only enddate is given, flows + * active on and before enddate are returned. + * For example : + * "daterange=20150711" returns flows active on 20150711. + * "daterange=20150711-20150714" returns flows active between these 2 dates. + * "daterange=20150711-" returns flows active on and after 20150711. + * "daterange=-20150711" returns flows active on and before 20150711. */ @GET @Path("/flows/{clusterid}/") @@ -539,6 +657,7 @@ public Set getFlows( @Context HttpServletResponse res, @PathParam("clusterid") String clusterId, @QueryParam("limit") String limit, + @QueryParam("daterange") String dateRange, @QueryParam("fields") String fields) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : @@ -550,11 +669,12 @@ public Set getFlows( TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); Set entities = null; try { + DateRange range = parseDateRange(dateRange); entities = timelineReaderManager.getEntities( null, parseStr(clusterId), null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), parseLongStr(limit), - null, null, null, null, null, null, null, null, null, null, - parseFieldsStr(fields, COMMA_DELIMITER)); + range.dateStart, range.dateEnd, null, null, null, null, null, null, + null, null, parseFieldsStr(fields, COMMA_DELIMITER)); } catch (Exception e) { handleException(e, url, startTime, "limit"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java index 70a0915601..3e32128c45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java @@ -87,6 +87,12 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) if (limit == null || limit < 0) { limit = TimelineReader.DEFAULT_LIMIT; } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } } @Override @@ -100,7 +106,16 @@ protected Result getResult(Configuration hbaseConf, Connection conn) protected ResultScanner getResults(Configuration hbaseConf, Connection conn) throws IOException { Scan scan = new Scan(); - scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + if (createdTimeBegin == DEFAULT_BEGIN_TIME && + createdTimeEnd == DEFAULT_END_TIME) { + scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + } else { + scan.setStartRow( + FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd)); + scan.setStopRow( + FlowActivityRowKey.getRowKeyPrefix(clusterId, + (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1)))); + } // use the page filter to limit the result to the page size // the scanner may still return more than the limit; therefore we need to // read the right number as we iterate diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index f7841e062d..fc1aa70c9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -55,10 +55,31 @@ public String getFlowId() { return flowId; } + /** + * Constructs a row key prefix for the flow activity table as follows: + * {@code clusterId!} + * + * @param clusterId + * @return byte array with the row key prefix + */ public static byte[] getRowKeyPrefix(String clusterId) { return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, "")); } + /** + * Constructs a row key prefix for the flow activity table as follows: + * {@code clusterId!dayTimestamp!} + * + * @param clusterId + * @param dayTs + * @return byte array with the row key prefix + */ + public static byte[] getRowKeyPrefix(String clusterId, long dayTs) { + return Separator.QUALIFIERS.join( + Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), + Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), new byte[0]); + } + /** * Constructs a row key for the flow activity table as follows: * {@code clusterId!dayTimestamp!user!flowId} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index f6a5090f3c..4f53fe24a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -27,6 +27,7 @@ import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; +import java.text.DateFormat; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.junit.After; import org.junit.AfterClass; @@ -70,6 +72,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { private TimelineReaderServer server; private static HBaseTestingUtility util; private static long ts = System.currentTimeMillis(); + private static long dayTs = + TimelineStorageUtils.getTopOfTheDayTimestamp(ts); @BeforeClass public static void setup() throws Exception { @@ -509,6 +513,61 @@ public void testGetFlows() throws Exception { entities = resp.getEntity(new GenericType>(){}); assertNotNull(entities); assertEquals(1, entities.size()); + + DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get(); + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?daterange=" + fmt.format(dayTs) + "-" + + fmt.format(dayTs + (2*86400000L))); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>(){}); + assertNotNull(entities); + assertEquals(2, entities.size()); + for (FlowActivityEntity entity : entities) { + assertTrue((entity.getId().endsWith("@flow_name") && + entity.getFlowRuns().size() == 2) || + (entity.getId().endsWith("@flow_name2") && + entity.getFlowRuns().size() == 1)); + } + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?daterange=" + + fmt.format(dayTs + (4*86400000L))); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>(){}); + assertNotNull(entities); + assertEquals(0, entities.size()); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?daterange=-" + + fmt.format(dayTs + (2*86400000L))); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>(){}); + assertNotNull(entities); + assertEquals(2, entities.size()); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?daterange=" + + fmt.format(dayTs - (2*86400000L)) + "-"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>(){}); + assertNotNull(entities); + assertEquals(2, entities.size()); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?daterange=20150711:20150714"); + verifyHttpResponse(client, uri, Status.BAD_REQUEST); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?daterange=20150714-20150711"); + verifyHttpResponse(client, uri, Status.BAD_REQUEST); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?daterange=2015071129-20150712"); + verifyHttpResponse(client, uri, Status.BAD_REQUEST); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?daterange=20150711-2015071243"); + verifyHttpResponse(client, uri, Status.BAD_REQUEST); } finally { client.destroy(); }