From f0e752c14b0b441955cdfc94f95937c90f1f055d Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Thu, 5 Mar 2015 15:03:30 -0800 Subject: [PATCH] YARN-3264. Created backing storage write interface and a POC only FS based storage implementation. Contributed by Vrushali C. (cherry picked from commit 821b68d05d246fd57d7b7286eb2ccc075ed1eae8) --- .../TimelineWriteResponse.java | 170 ++++++++++++++++++ .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../TestDistributedShell.java | 56 +++++- .../aggregator/TimelineAggregator.java | 43 +++-- .../storage/FileSystemTimelineWriterImpl.java | 144 +++++++++++++++ .../storage/TimelineAggregationTrack.java} | 11 +- .../storage/TimelineWriter.java | 66 +++++++ .../TestFileSystemTimelineWriterImpl.java | 79 ++++++++ 8 files changed, 555 insertions(+), 20 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/{test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java => main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java} (79%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java new file mode 100644 index 0000000000..82ecdbd99e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/** + * A class that holds a list of put errors. This is the response returned when a + * list of {@link TimelineEntity} objects is added to the timeline. If there are errors + * in storing individual entity objects, they will be indicated in the list of + * errors. + */ +@XmlRootElement(name = "response") +@XmlAccessorType(XmlAccessType.NONE) +@Public +@Unstable +public class TimelineWriteResponse { + + private List errors = new ArrayList(); + + public TimelineWriteResponse() { + + } + + /** + * Get a list of {@link TimelineWriteError} instances + * + * @return a list of {@link TimelineWriteError} instances + */ + @XmlElement(name = "errors") + public List getErrors() { + return errors; + } + + /** + * Add a single {@link TimelineWriteError} instance into the existing list + * + * @param error + * a single {@link TimelineWriteError} instance + */ + public void addError(TimelineWriteError error) { + errors.add(error); + } + + /** + * Add a list of {@link TimelineWriteError} instances into the existing list + * + * @param errors + * a list of {@link TimelineWriteError} instances + */ + public void addErrors(List errors) { + this.errors.addAll(errors); + } + + /** + * Set the list to the given list of {@link TimelineWriteError} instances + * + * @param errors + * a list of {@link TimelineWriteError} instances + */ + public void setErrors(List errors) { + this.errors.clear(); + this.errors.addAll(errors); + } + + /** + * A class that holds the error code for one entity. + */ + @XmlRootElement(name = "error") + @XmlAccessorType(XmlAccessType.NONE) + @Public + @Unstable + public static class TimelineWriteError { + + /** + * Error code returned if an IOException is encountered when storing an + * entity. + */ + public static final int IO_EXCEPTION = 1; + + private String entityId; + private String entityType; + private int errorCode; + + /** + * Get the entity Id + * + * @return the entity Id + */ + @XmlElement(name = "entity") + public String getEntityId() { + return entityId; + } + + /** + * Set the entity Id + * + * @param entityId + * the entity Id + */ + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + /** + * Get the entity type + * + * @return the entity type + */ + @XmlElement(name = "entitytype") + public String getEntityType() { + return entityType; + } + + /** + * Set the entity type + * + * @param entityType + * the entity type + */ + public void setEntityType(String entityType) { + this.entityType = entityType; + } + + /** + * Get the error code + * + * @return an error code + */ + @XmlElement(name = "errorcode") + public int getErrorCode() { + return errorCode; + } + + /** + * Set the error code to the given error code + * + * @param errorCode + * an error code + */ + public void setErrorCode(int errorCode) { + this.errorCode = errorCode; + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c0241c9c56..43aafb8efb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1927,6 +1927,12 @@ public static boolean isAclEnabled(Configuration conf) { = TIMELINE_SERVICE_PREFIX + "entity-file.fs-support-append"; + /** + * Settings for timeline service v2.0 + */ + public static final String TIMELINE_SERVICE_WRITER_CLASS = + TIMELINE_SERVICE_PREFIX + "writer.class"; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 615df6758e..aa60808929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -55,6 +55,7 @@ import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -324,6 +326,7 @@ public void run() { boolean verified = false; String errorMessage = ""; + ApplicationId appId = null; while(!verified) { List apps = yarnClient.getApplications(); if (apps.size() == 0 ) { @@ -331,6 +334,7 @@ public void run() { continue; } ApplicationReport appReport = apps.get(0); + appId = appReport.getApplicationId(); if(appReport.getHost().equals("N/A")) { Thread.sleep(10); continue; @@ -373,7 +377,7 @@ public void run() { if (!isTestingTimelineV2) { checkTimelineV1(haveDomain); } else { - checkTimelineV2(haveDomain); + checkTimelineV2(haveDomain, appId); } } @@ -429,8 +433,54 @@ private void checkTimelineV1(boolean haveDomain) throws Exception { } } - private void checkTimelineV2(boolean haveDomain) { - // TODO check timeline V2 here after we have a storage layer + private void checkTimelineV2(boolean haveDomain, ApplicationId appId) { + // For PoC check in /tmp/ YARN-3264 + String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + + File tmpRootFolder = new File(tmpRoot); + Assert.assertTrue(tmpRootFolder.isDirectory()); + + // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs + String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // there will be at least one attempt, look for that file + String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() + + "_000" + appId.getId() + "_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + String appAttemptFileName = outputDirApp + appTimestampFileName; + File appAttemptFile = new File(appAttemptFileName); + Assert.assertTrue(appAttemptFile.exists()); + + String outputDirContainer = tmpRoot + "/DS_CONTAINER/"; + File containerFolder = new File(outputDirContainer); + Assert.assertTrue(containerFolder.isDirectory()); + + String containerTimestampFileName = "container_" + + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_01_000002.thist"; + String containerFileName = outputDirContainer + containerTimestampFileName; + File containerFile = new File(containerFileName); + Assert.assertTrue(containerFile.exists()); + String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId() + + "_"; + deleteAppFiles(new File(outputDirApp), appTimeStamp); + deleteAppFiles(new File(outputDirContainer), appTimeStamp); + tmpRootFolder.delete(); + } + + private void deleteAppFiles(File rootDir, String appTimeStamp) { + boolean deleted = false; + File[] listOfFiles = rootDir.listFiles(); + for (File f1 : listOfFiles) { + // list all attempts for this app and delete them + if (f1.getName().contains(appTimeStamp)){ + deleted = f1.delete(); + Assert.assertTrue(deleted); + } + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java index 42277124a3..dbd089534c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -26,12 +28,16 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; - +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.util.ReflectionUtils; /** * Service that handles writes to the timeline service and writes them to the * backing storage. * - * Classes that extend this can putIfAbsent their own lifecycle management or + * Classes that extend this can add their own lifecycle management or * customization of request handling. */ @Private @@ -39,6 +45,8 @@ public abstract class TimelineAggregator extends CompositeService { private static final Log LOG = LogFactory.getLog(TimelineAggregator.class); + private TimelineWriter writer; + public TimelineAggregator(String name) { super(name); } @@ -46,6 +54,11 @@ public TimelineAggregator(String name) { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + writer = ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, + TimelineWriter.class), conf); + writer.init(conf); } @Override @@ -56,6 +69,11 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { super.serviceStop(); + writer.stop(); + } + + public TimelineWriter getWriter() { + return writer; } /** @@ -69,20 +87,17 @@ protected void serviceStop() throws Exception { * * @param entities entities to post * @param callerUgi the caller UGI + * @return the response that contains the result of the post. */ - public void postEntities(TimelineEntities entities, - UserGroupInformation callerUgi) { - // Add this output temporarily for our prototype - // TODO remove this after we have an actual implementation - LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.info("postEntities(entities=" + entities + ", callerUgi=" + - callerUgi + ")"); - - // TODO implement + public TimelineWriteResponse postEntities(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + - callerUgi + ")"); + LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); + LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); } + + return writer.write(entities); } /** @@ -104,4 +119,4 @@ public void postEntitiesAsync(TimelineEntities entities, callerUgi + ")"); } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java new file mode 100644 index 0000000000..4a57e9768c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +/** + * This implements a local file based backend for storing application timeline + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class FileSystemTimelineWriterImpl extends AbstractService + implements TimelineWriter { + + private String outputRoot; + + /** Config param for timeline service storage tmp root for FILE YARN-3264 */ + public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT + = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; + + /** default value for storage location on local disk */ + public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + = "/tmp/timeline_service_data/"; + + /** Default extension for output files */ + public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + + FileSystemTimelineWriterImpl() { + super((FileSystemTimelineWriterImpl.class.getName())); + } + + /** + * Stores the entire information in {@link TimelineEntity} to the + * timeline store. Any errors occurring for individual write request objects + * will be reported in the response. + * + * @param data + * a {@link TimelineEntity} object + * @return {@link TimelineWriteResponse} object. + * @throws IOException + */ + @Override + public TimelineWriteResponse write(TimelineEntities entities) + throws IOException { + TimelineWriteResponse response = new TimelineWriteResponse(); + for (TimelineEntity entity : entities.getEntities()) { + write(entity, response); + } + return response; + } + + private void write(TimelineEntity entity, + TimelineWriteResponse response) throws IOException { + PrintWriter out = null; + try { + File outputDir = new File(outputRoot + entity.getType()); + String fileName = outputDir + "/" + entity.getId() + + TIMELINE_SERVICE_STORAGE_EXTENSION; + if (!outputDir.exists()) { + if (!outputDir.mkdirs()) { + throw new IOException("Could not create directories for " + fileName); + } + } + out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true))); + out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); + out.write("\n"); + } catch (IOException ioe) { + TimelineWriteError error = new TimelineWriteError(); + error.setEntityId(entity.getId()); + error.setEntityType(entity.getType()); + /* + * TODO: set an appropriate error code after PoC could possibly be: + * error.setErrorCode(TimelineWriteError.IO_EXCEPTION); + */ + response.addError(error); + } finally { + if (out != null) { + out.close(); + } + } + } + + /** + * Aggregates the entity information to the timeline store based on which + * track this entity is to be rolled up to The tracks along which aggregations + * are to be done are given by {@link TimelineAggregationTrack} + * + * Any errors occurring for individual write request objects will be reported + * in the response. + * + * @param data + * a {@link TimelineEntity} object + * a {@link TimelineAggregationTrack} enum value + * @return a {@link TimelineWriteResponse} object. + * @throws IOException + */ + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + + } + + public String getOutputRoot() { + return outputRoot; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java similarity index 79% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java index 821e455f97..955ca80d11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java @@ -16,8 +16,13 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -public class TestTimelineAggregator { +package org.apache.hadoop.yarn.server.timelineservice.storage; +/** + * specifies the tracks along which an entity + * info is to be aggregated on + * + */ +public enum TimelineAggregationTrack { + FLOW, USER, QUEUE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java new file mode 100644 index 0000000000..71ad7abc5e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.service.Service; + +/** + * This interface is for storing application timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface TimelineWriter extends Service { + + /** + * Stores the entire information in {@link TimelineEntities} to the + * timeline store. Any errors occurring for individual write request objects + * will be reported in the response. + * + * @param data + * a {@link TimelineEntities} object. + * @return a {@link TimelineWriteResponse} object. + * @throws IOException + */ + TimelineWriteResponse write(TimelineEntities data) throws IOException; + + /** + * Aggregates the entity information to the timeline store based on which + * track this entity is to be rolled up to The tracks along which aggregations + * are to be done are given by {@link TimelineAggregationTrack} + * + * Any errors occurring for individual write request objects will be reported + * in the response. + * + * @param data + * a {@link TimelineEntity} object + * a {@link TimelineAggregationTrack} enum + * value. + * @return a {@link TimelineWriteResponse} object. + * @throws IOException + */ + TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java new file mode 100644 index 0000000000..f720454609 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Test; +import org.apache.commons.io.FileUtils; + +public class TestFileSystemTimelineWriterImpl { + + /** + * Unit test for PoC YARN 3264 + * @throws Exception + */ + @Test + public void testWriteEntityToFile() throws Exception { + String name = "unit_test_BaseAggregator_testWriteEntityToFile_" + + Long.toString(System.currentTimeMillis()); + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L); + entity.setModifiedTime(1425016502000L); + te.addEntity(entity); + + FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl(); + fsi.serviceInit(new Configuration()); + fsi.write(te); + + String fileName = fsi.getOutputRoot() + "/" + type + "/" + id + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path = Paths.get(fileName); + File f = new File(fileName); + assertTrue(f.exists() && !f.isDirectory()); + List data = Files.readAllLines(path, StandardCharsets.UTF_8); + // ensure there's only one entity + 1 new line + assertTrue(data.size() == 2); + String d = data.get(0); + // confirm the contents same as what was written + assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + + // delete the directory + File outputDir = new File(fsi.getOutputRoot()); + FileUtils.deleteDirectory(outputDir); + assertTrue(!(f.exists())); + } +}