YARN-3040. Make putEntities operation be aware of the app's context. Contributed by Zhijie Shen

This commit is contained in:
Junping Du 2015-03-26 09:59:32 -07:00 committed by Sangjin Lee
parent 5e3d9a477b
commit d67c9bdb4d
36 changed files with 960 additions and 139 deletions

View File

@ -133,6 +133,7 @@ private static void addDeprecatedKeys() {
public static final String RM_PREFIX = "yarn.resourcemanager.";
public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id";
public static final String DEFAULT_RM_CLUSTER_ID = "yarn_cluster";
public static final String RM_HOSTNAME = RM_PREFIX + "hostname";

View File

@ -22,8 +22,10 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import org.apache.commons.cli.CommandLine;
@ -187,6 +189,9 @@ public class Client {
// Timeline domain writer access control
private String modifyACLs = null;
private String flowId = null;
private String flowRunId = null;
// Command line options
private Options opts;
@ -260,7 +265,8 @@ public Client(Configuration conf) throws Exception {
opts.addOption("shell_args", true, "Command line args for the shell script." +
"Multiple args can be separated by empty space.");
opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("shell_env", true,
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
@ -287,6 +293,10 @@ public Client(Configuration conf) throws Exception {
+ "modify the timeline entities in the given domain");
opts.addOption("create", false, "Flag to indicate whether to create the "
+ "domain specified with -domain.");
opts.addOption("flow", true, "ID of the flow which the distributed shell "
+ "app belongs to");
opts.addOption("flow_run", true, "ID of the flowrun which the distributed "
+ "shell app belongs to");
opts.addOption("help", false, "Print usage");
opts.addOption("node_label_expression", true,
"Node label expression to determine the nodes"
@ -476,6 +486,12 @@ public boolean init(String[] args) throws ParseException {
+ cliParser.getOptionValue("container_retry_interval"));
}
if (cliParser.hasOption("flow")) {
flowId = cliParser.getOptionValue("flow");
}
if (cliParser.hasOption("flow_run")) {
flowRunId = cliParser.getOptionValue("flow_run");
}
return true;
}
@ -567,6 +583,15 @@ public boolean run() throws IOException, YarnException {
.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
Set<String> tags = new HashSet<String>();
if (flowId != null) {
tags.add(TimelineUtils.generateFlowIdTag(flowId));
}
if (flowRunId != null) {
tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
}
appContext.setApplicationTags(tags);
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources

View File

@ -38,6 +38,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -81,6 +82,7 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -263,6 +265,21 @@ public void testDSShellWithoutDomainV2() throws Exception {
}
public void testDSShell(boolean haveDomain) throws Exception {
testDSShell(haveDomain, true);
}
@Test(timeout=90000)
public void testDSShellWithoutDomainV2DefaultFlow() throws Exception {
testDSShell(false, true);
}
@Test(timeout=90000)
public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception {
testDSShell(false, false);
}
public void testDSShell(boolean haveDomain, boolean defaultFlow)
throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
@ -299,6 +316,15 @@ public void testDSShell(boolean haveDomain) throws Exception {
};
isTestingTimelineV2 = true;
args = mergeArgs(args, timelineArgs);
if (!defaultFlow) {
String[] flowArgs = {
"--flow",
"test_flow_id",
"--flow_run",
"12345678"
};
args = mergeArgs(args, flowArgs);
}
LOG.info("Setup: Using timeline v2!");
}
@ -377,7 +403,7 @@ public void run() {
if (!isTestingTimelineV2) {
checkTimelineV1(haveDomain);
} else {
checkTimelineV2(haveDomain, appId);
checkTimelineV2(haveDomain, appId, defaultFlow);
}
}
@ -433,53 +459,58 @@ private void checkTimelineV1(boolean haveDomain) throws Exception {
}
}
private void checkTimelineV2(boolean haveDomain, ApplicationId appId) {
// For PoC check in /tmp/ YARN-3264
String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
private void checkTimelineV2(
boolean haveDomain, ApplicationId appId, boolean defaultFlow)
throws Exception {
// For PoC check in /tmp/timeline_service_data YARN-3264
String tmpRoot =
FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ "/entities/";
File tmpRootFolder = new File(tmpRoot);
Assert.assertTrue(tmpRootFolder.isDirectory());
try {
Assert.assertTrue(tmpRootFolder.isDirectory());
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/";
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
String outputDirApp = tmpRoot +
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
UserGroupInformation.getCurrentUser().getShortUserName() +
(defaultFlow ? "/" +
TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
"/0/" : "/test_flow_id/12345678/") +
appId.toString() + "/DS_APP_ATTEMPT/";
File entityFolder = new File(outputDirApp);
Assert.assertTrue(entityFolder.isDirectory());
File entityFolder = new File(outputDirApp);
Assert.assertTrue(entityFolder.isDirectory());
// there will be at least one attempt, look for that file
String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
+ "_000" + appId.getId() + "_000001"
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String appAttemptFileName = outputDirApp + appTimestampFileName;
File appAttemptFile = new File(appAttemptFileName);
Assert.assertTrue(appAttemptFile.exists());
// there will be at least one attempt, look for that file
String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
+ "_000" + appId.getId() + "_000001"
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String appAttemptFileName = outputDirApp + appTimestampFileName;
File appAttemptFile = new File(appAttemptFileName);
Assert.assertTrue(appAttemptFile.exists());
String outputDirContainer = tmpRoot + "/DS_CONTAINER/";
File containerFolder = new File(outputDirContainer);
Assert.assertTrue(containerFolder.isDirectory());
String outputDirContainer = tmpRoot +
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
UserGroupInformation.getCurrentUser().getShortUserName() +
(defaultFlow ? "/" +
TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
"/0/" : "/test_flow_id/12345678/") +
appId.toString() + "/DS_CONTAINER/";
File containerFolder = new File(outputDirContainer);
Assert.assertTrue(containerFolder.isDirectory());
String containerTimestampFileName = "container_"
+ appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000002.thist";
String containerFileName = outputDirContainer + containerTimestampFileName;
File containerFile = new File(containerFileName);
Assert.assertTrue(containerFile.exists());
String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+ "_";
deleteAppFiles(new File(outputDirApp), appTimeStamp);
deleteAppFiles(new File(outputDirContainer), appTimeStamp);
tmpRootFolder.delete();
}
private void deleteAppFiles(File rootDir, String appTimeStamp) {
boolean deleted = false;
File[] listOfFiles = rootDir.listFiles();
for (File f1 : listOfFiles) {
// list all attempts for this app and delete them
if (f1.getName().contains(appTimeStamp)){
deleted = f1.delete();
Assert.assertTrue(deleted);
}
String containerTimestampFileName = "container_"
+ appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000002.thist";
String containerFileName = outputDirContainer + containerTimestampFileName;
File containerFile = new File(containerFileName);
Assert.assertTrue(containerFile.exists());
String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+ "_";
} finally {
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
@ -43,6 +44,9 @@
@Evolving
public class TimelineUtils {
public static final String FLOW_ID_TAG_PREFIX = "TIMELINE_FLOW_ID_TAG";
public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG";
private static ObjectMapper mapper;
static {
@ -154,4 +158,16 @@ public static Text buildTimelineTokenService(Configuration conf) {
getTimelineTokenServiceAddress(conf);
return SecurityUtil.buildTokenService(timelineServiceAddr);
}
public static String generateDefaultFlowIdBasedOnAppId(ApplicationId appId) {
return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
}
public static String generateFlowIdTag(String flowId) {
return FLOW_ID_TAG_PREFIX + ":" + flowId;
}
public static String generateFlowRunIdTag(String flowRunId) {
return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId;
}
}

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
@ -54,4 +56,18 @@ ReportNewCollectorInfoResponse reportNewCollectorInfo(
ReportNewCollectorInfoRequest request)
throws YarnException, IOException;
/**
* <p>
* The collector needs to get the context information including user, flow
* and flow run ID to associate with every incoming put-entity requests.
* </p>
* @param request the request of getting the aggregator context information of
* the given application
* @return
* @throws YarnException
* @throws IOException
*/
GetTimelineCollectorContextResponse getTimelineCollectorContext(
GetTimelineCollectorContextRequest request)
throws YarnException, IOException;
}

View File

@ -30,11 +30,16 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
@ -84,6 +89,21 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
}
}
@Override
public GetTimelineCollectorContextResponse getTimelineCollectorContext(
GetTimelineCollectorContextRequest request)
throws YarnException, IOException {
GetTimelineCollectorContextRequestProto requestProto =
((GetTimelineCollectorContextRequestPBImpl) request).getProto();
try {
return new GetTimelineCollectorContextResponsePBImpl(
proxy.getTimelineCollectorContext(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public void close() {
if (this.proxy != null) {

View File

@ -20,11 +20,16 @@
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
@ -56,4 +61,20 @@ public ReportNewCollectorInfoResponseProto reportNewCollectorInfo(
}
}
@Override
public GetTimelineCollectorContextResponseProto getTimelineCollectorContext(
RpcController controller,
GetTimelineCollectorContextRequestProto proto) throws ServiceException {
GetTimelineCollectorContextRequestPBImpl request =
new GetTimelineCollectorContextRequestPBImpl(proto);
try {
GetTimelineCollectorContextResponse response =
real.getTimelineCollectorContext(request);
return ((GetTimelineCollectorContextResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Records;
public abstract class GetTimelineCollectorContextRequest {
public static GetTimelineCollectorContextRequest newInstance(
ApplicationId appId) {
GetTimelineCollectorContextRequest request =
Records.newRecord(GetTimelineCollectorContextRequest.class);
request.setApplicationId(appId);
return request;
}
public abstract ApplicationId getApplicationId();
public abstract void setApplicationId(ApplicationId appId);
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.util.Records;
public abstract class GetTimelineCollectorContextResponse {
public static GetTimelineCollectorContextResponse newInstance(
String userId, String flowId, String flowRunId) {
GetTimelineCollectorContextResponse response =
Records.newRecord(GetTimelineCollectorContextResponse.class);
response.setUserId(userId);
response.setFlowId(flowId);
response.setFlowRunId(flowRunId);
return response;
}
public abstract String getUserId();
public abstract void setUserId(String userId);
public abstract String getFlowId();
public abstract void setFlowId(String flowId);
public abstract String getFlowRunId();
public abstract void setFlowRunId(String flowRunId);
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
public class GetTimelineCollectorContextRequestPBImpl extends
GetTimelineCollectorContextRequest {
GetTimelineCollectorContextRequestProto
proto = GetTimelineCollectorContextRequestProto.getDefaultInstance();
GetTimelineCollectorContextRequestProto.Builder builder = null;
boolean viaProto = false;
private ApplicationId appId = null;
public GetTimelineCollectorContextRequestPBImpl() {
builder = GetTimelineCollectorContextRequestProto.newBuilder();
}
public GetTimelineCollectorContextRequestPBImpl(
GetTimelineCollectorContextRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public GetTimelineCollectorContextRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToBuilder() {
if (appId != null) {
builder.setAppId(convertToProtoFormat(this.appId));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetTimelineCollectorContextRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public ApplicationId getApplicationId() {
if (this.appId != null) {
return this.appId;
}
GetTimelineCollectorContextRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasAppId()) {
return null;
}
this.appId = convertFromProtoFormat(p.getAppId());
return this.appId;
}
@Override
public void setApplicationId(ApplicationId appId) {
maybeInitBuilder();
if (appId == null)
builder.clearAppId();
this.appId = appId;
}
private ApplicationIdPBImpl convertFromProtoFormat(YarnProtos.ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private YarnProtos.ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
public class GetTimelineCollectorContextResponsePBImpl extends
GetTimelineCollectorContextResponse {
GetTimelineCollectorContextResponseProto proto =
GetTimelineCollectorContextResponseProto.getDefaultInstance();
GetTimelineCollectorContextResponseProto.Builder builder = null;
boolean viaProto = false;
public GetTimelineCollectorContextResponsePBImpl() {
builder = GetTimelineCollectorContextResponseProto.newBuilder();
}
public GetTimelineCollectorContextResponsePBImpl(
GetTimelineCollectorContextResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public GetTimelineCollectorContextResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetTimelineCollectorContextResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public String getUserId() {
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasUserId()) {
return null;
}
return p.getUserId();
}
@Override
public void setUserId(String userId) {
maybeInitBuilder();
if (userId == null) {
builder.clearUserId();
return;
}
builder.setUserId(userId);
}
@Override
public String getFlowId() {
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasFlowId()) {
return null;
}
return p.getFlowId();
}
@Override
public void setFlowId(String flowId) {
maybeInitBuilder();
if (flowId == null) {
builder.clearFlowId();
return;
}
builder.setFlowId(flowId);
}
@Override
public String getFlowRunId() {
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasFlowRunId()) {
return null;
}
return p.getFlowRunId();
}
@Override
public void setFlowRunId(String flowRunId) {
maybeInitBuilder();
if (flowRunId == null) {
builder.clearFlowRunId();
return;
}
builder.setFlowRunId(flowRunId);
}
}

View File

@ -26,4 +26,5 @@ import "yarn_server_common_service_protos.proto";
service CollectorNodemanagerProtocolService {
rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto);
rpc getTimelineCollectorContext (GetTimelineCollectorContextRequestProto) returns (GetTimelineCollectorContextResponseProto);
}

View File

@ -140,6 +140,15 @@ message ReportNewCollectorInfoRequestProto {
message ReportNewCollectorInfoResponseProto {
}
message GetTimelineCollectorContextRequestProto {
optional ApplicationIdProto appId = 1;
}
message GetTimelineCollectorContextResponseProto {
optional string user_id = 1;
optional string flow_id = 2;
optional string flow_run_id = 3;
}
message NMContainerStatusProto {
optional ContainerIdProto container_id = 1;

View File

@ -64,6 +64,8 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
@ -170,6 +172,31 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
}
// Verify request with a valid app ID
try {
GetTimelineCollectorContextRequest request =
GetTimelineCollectorContextRequest.newInstance(
ApplicationId.newInstance(0, 1));
GetTimelineCollectorContextResponse response =
proxy.getTimelineCollectorContext(request);
Assert.assertEquals("test_user_id", response.getUserId());
Assert.assertEquals("test_flow_id", response.getFlowId());
Assert.assertEquals("test_flow_run_id", response.getFlowRunId());
} catch (YarnException | IOException e) {
Assert.fail("RPC call failured is not expected here.");
}
// Verify request with an invalid app ID
try {
GetTimelineCollectorContextRequest request =
GetTimelineCollectorContextRequest.newInstance(
ApplicationId.newInstance(0, 2));
proxy.getTimelineCollectorContext(request);
Assert.fail("RPC call failured is expected here.");
} catch (YarnException | IOException e) {
Assert.assertTrue(e instanceof YarnException);
Assert.assertTrue(e.getMessage().contains("The application is not found."));
}
server.stop();
}
@ -358,6 +385,18 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
recordFactory.newRecordInstance(ReportNewCollectorInfoResponse.class);
return response;
}
@Override
public GetTimelineCollectorContextResponse getTimelineCollectorContext(
GetTimelineCollectorContextRequest request)
throws YarnException, IOException {
if (request.getApplicationId().getId() == 1) {
return GetTimelineCollectorContextResponse.newInstance(
"test_user_id", "test_flow_id", "test_flow_run_id");
} else {
throw new YarnException("The application is not found.");
}
}
}
}

View File

@ -30,13 +30,17 @@
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
public class NMCollectorService extends CompositeService implements
CollectorNodemanagerProtocol {
@ -93,7 +97,7 @@ public void serviceStop() throws Exception {
@Override
public ReportNewCollectorInfoResponse reportNewCollectorInfo(
ReportNewCollectorInfoRequest request) throws IOException {
ReportNewCollectorInfoRequest request) throws YarnException, IOException {
List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
Map<ApplicationId, String> newCollectorsMap =
@ -107,4 +111,16 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
return ReportNewCollectorInfoResponse.newInstance();
}
@Override
public GetTimelineCollectorContextResponse getTimelineCollectorContext(
GetTimelineCollectorContextRequest request)
throws YarnException, IOException {
Application app = context.getApplications().get(request.getApplicationId());
if (app == null) {
throw new YarnException("Application " + request.getApplicationId() +
" doesn't exist on NM.");
}
return GetTimelineCollectorContextResponse.newInstance(
app.getUser(), app.getFlowId(), app.getFlowRunId());
}
}

View File

@ -146,10 +146,11 @@
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerManagerImpl extends CompositeService implements
@ -331,8 +332,10 @@ private void recoverApplication(ContainerManagerApplicationProto p)
}
LOG.info("Recovering application " + appId);
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context, p.getAppLogAggregationInitedTime());
//TODO: Recover flow and flow run ID
ApplicationImpl app = new ApplicationImpl(
dispatcher, p.getUser(), null, null, appId, creds, context, p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
@ -951,8 +954,12 @@ protected void startContainerInternal(
try {
if (!isServiceStopped()) {
// Create the application
Application application = new ApplicationImpl(dispatcher, user,
applicationID, credentials, context);
String flowId = launchContext.getEnvironment().get(
TimelineUtils.FLOW_ID_TAG_PREFIX);
String flowRunId = launchContext.getEnvironment().get(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
Application application = new ApplicationImpl(
dispatcher, user, flowId, flowRunId, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app "

View File

@ -35,4 +35,8 @@ public interface Application extends EventHandler<ApplicationEvent> {
ApplicationState getApplicationState();
String getFlowId();
String getFlowRunId();
}

View File

@ -71,6 +71,8 @@ public class ApplicationImpl implements Application {
final Dispatcher dispatcher;
final String user;
final String flowId;
final String flowRunId;
final ApplicationId appId;
final Credentials credentials;
Map<ApplicationAccessType, String> applicationACLs;
@ -95,11 +97,14 @@ public class ApplicationImpl implements Application {
private long applicationLogInitedTimestamp = -1;
private final NMStateStoreService appStateStore;
public ApplicationImpl(Dispatcher dispatcher, String user,
ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
String flowRunId, ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
this.dispatcher = dispatcher;
this.user = user;
this.flowId = flowId;
this.flowRunId = flowRunId;
this.appId = appId;
this.credentials = credentials;
this.aclsManager = context.getApplicationACLsManager();
@ -112,9 +117,11 @@ public ApplicationImpl(Dispatcher dispatcher, String user,
setAppLogInitedTimestamp(recoveredLogInitedTime);
}
public ApplicationImpl(Dispatcher dispatcher, String user,
ApplicationId appId, Credentials credentials, Context context) {
this(dispatcher, user, appId, credentials, context, -1);
public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
String flowRunId, ApplicationId appId, Credentials credentials,
Context context) {
this(dispatcher, user, flowId, flowRunId, appId, credentials,
context, -1);
}
@Override
@ -559,4 +566,12 @@ public LogAggregationContext getLogAggregationContext() {
this.readLock.unlock();
}
}
public String getFlowId() {
return flowId;
}
public String getFlowRunId() {
return flowRunId;
}
}

View File

@ -550,7 +550,8 @@ private class WrappedApplication {
this.user = user;
this.appId = BuilderUtils.newApplicationId(timestamp, id);
app = new ApplicationImpl(dispatcher, this.user, appId, null, context);
app = new ApplicationImpl(
dispatcher, this.user, null, null, appId, null, context);
containers = new ArrayList<Container>();
for (int i = 0; i < numContainers; i++) {
Container container = createMockedContainer(this.appId, i);

View File

@ -39,6 +39,9 @@ public class MockApp implements Application {
Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
ApplicationState appState;
Application app;
String flowId;
String flowRunId;
public MockApp(int uniqId) {
this("mockUser", 1234, uniqId);
@ -77,4 +80,11 @@ public ApplicationState getApplicationState() {
public void handle(ApplicationEvent event) {}
public String getFlowId() {
return flowId;
}
public String getFlowRunId() {
return flowRunId;
}
}

View File

@ -339,7 +339,7 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
final String filename = "logfile1";
final String logMessage = "log message\n";
nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
appId, null, nmContext));
null, null, appId, null, nmContext));
MockContainer container = new MockContainer(appAttemptId,
new AsyncDispatcher(), new Configuration(), "user", appId, 1);

View File

@ -34,8 +34,8 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
@ -228,6 +229,26 @@ protected void setupTokens(
.get(applicationId)
.getSubmitTime()));
// Set flow context info
for (String tag :
rmContext.getRMApps().get(applicationId).getApplicationTags()) {
if (tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX + ":") ||
tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX.toLowerCase() + ":")) {
String value = tag.substring(
TimelineUtils.FLOW_ID_TAG_PREFIX.length() + 1);
if (!value.isEmpty()) {
environment.put(TimelineUtils.FLOW_ID_TAG_PREFIX, value);
}
}
if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
String value = tag.substring(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
if (!value.isEmpty()) {
environment.put(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, value);
}
}
}
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
ByteBuffer tokens = container.getTokens();

View File

@ -164,7 +164,7 @@ public void appACLsUpdated(RMApp app, String appViewACLs,
@SuppressWarnings("unchecked")
public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ApplicaitonStateUpdatedEvent(
app.getApplicationId(),

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
/**
* This class is responsible for posting application and appattempt lifecycle
@ -87,6 +88,12 @@ protected void handleSystemMetricsEvent(SystemMetricsEvent event) {
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
}
@Override
protected TimelineCollectorContext getTimelineEntityContext() {
// TODO address in YARN-3390.
return null;
}
/**
* EventHandler implementation which forward events to SystemMetricsPublisher.

View File

@ -20,20 +20,27 @@
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
public class TestTimelineServiceClientIntegration {
private static TimelineCollectorManager collectorManager;
private static PerNodeTimelineCollectorsAuxService auxService;
@ -86,7 +93,17 @@ public MyTimelineCollectorManager() {
@Override
protected CollectorNodemanagerProtocol getNMCollectorService() {
return mock(CollectorNodemanagerProtocol.class);
CollectorNodemanagerProtocol protocol =
mock(CollectorNodemanagerProtocol.class);
try {
GetTimelineCollectorContextResponse response =
GetTimelineCollectorContextResponse.newInstance(null, null, null);
when(protocol.getTimelineCollectorContext(any(
GetTimelineCollectorContextRequest.class))).thenReturn(response);
} catch (YarnException | IOException e) {
fail();
}
return protocol;
}
}
}

View File

@ -18,9 +18,14 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* Service that handles writes to the timeline service and writes them to the
@ -31,16 +36,29 @@
@Private
@Unstable
public class AppLevelTimelineCollector extends TimelineCollector {
private final String applicationId;
// TODO define key metadata such as flow metadata, user, and queue
private final ApplicationId appId;
private final TimelineCollectorContext context;
public AppLevelTimelineCollector(String applicationId) {
super(AppLevelTimelineCollector.class.getName() + " - " + applicationId);
this.applicationId = applicationId;
public AppLevelTimelineCollector(ApplicationId appId) {
super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
Preconditions.checkNotNull(appId, "AppId shouldn't be null");
this.appId = appId;
context = new TimelineCollectorContext();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
YarnConfiguration.DEFAULT_RM_CLUSTER_ID));
// Set the default values, which will be updated with an RPC call to get the
// context info from NM.
// Current user usually is not the app user, but keep this field non-null
context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
// Use app ID to generate a default flow ID for orphan app
context.setFlowId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId));
// Set the flow run ID to 0 if it's an orphan app
context.setFlowRunId("0");
context.setAppId(appId.toString());
super.serviceInit(conf);
}
@ -54,4 +72,9 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
@Override
protected TimelineCollectorContext getTimelineEntityContext() {
return context;
}
}

View File

@ -95,7 +95,7 @@ protected void serviceStop() throws Exception {
*/
public boolean addApplication(ApplicationId appId) {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(appId.toString());
new AppLevelTimelineCollector(appId);
return (collectorManager.putIfAbsent(appId, collector)
== collector);
}

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
/**
* Service that handles writes to the timeline service and writes them to the
* backing storage.
@ -83,21 +84,24 @@ public TimelineWriter getWriter() {
*
* This method should be reserved for selected critical entities and events.
* For normal voluminous writes one should use the async method
* {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
* {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}.
*
* @param entities entities to post
* @param callerUgi the caller UGI
* @return the response that contains the result of the post.
*/
public TimelineWriteResponse postEntities(TimelineEntities entities,
public TimelineWriteResponse putEntities(TimelineEntities entities,
UserGroupInformation callerUgi) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
LOG.debug("postEntities(entities=" + entities + ", callerUgi="
LOG.debug("putEntities(entities=" + entities + ", callerUgi="
+ callerUgi + ")");
}
return writer.write(entities);
TimelineCollectorContext context = getTimelineEntityContext();
return writer.write(context.getClusterId(), context.getUserId(),
context.getFlowId(), context.getFlowRunId(), context.getAppId(),
entities);
}
/**
@ -111,12 +115,15 @@ public TimelineWriteResponse postEntities(TimelineEntities entities,
* @param entities entities to post
* @param callerUgi the caller UGI
*/
public void postEntitiesAsync(TimelineEntities entities,
public void putEntitiesAsync(TimelineEntities entities,
UserGroupInformation callerUgi) {
// TODO implement
if (LOG.isDebugEnabled()) {
LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
}
}
protected abstract TimelineCollectorContext getTimelineEntityContext();
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.collector;
public class TimelineCollectorContext {
private String clusterId;
private String userId;
private String flowId;
private String flowRunId;
private String appId;
public TimelineCollectorContext() {
this(null, null, null, null, null);
}
public TimelineCollectorContext(String clusterId, String userId,
String flowId, String flowRunId, String appId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowId = flowId;
this.flowRunId = flowRunId;
this.appId = appId;
}
public String getClusterId() {
return clusterId;
}
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getFlowId() {
return flowId;
}
public void setFlowId(String flowId) {
this.flowId = flowId;
}
public String getFlowRunId() {
return flowRunId;
}
public void setFlowRunId(String flowRunId) {
this.flowRunId = flowRunId;
}
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
}

View File

@ -43,6 +43,8 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
@ -102,6 +104,7 @@ public void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
nmCollectorService = getNMCollectorService();
startWebApp();
super.serviceStart();
}
@ -151,11 +154,11 @@ public TimelineCollector putIfAbsent(ApplicationId appId,
// Report to NM if a new collector is added.
if (collectorIsNew) {
try {
updateTimelineCollectorContext(appId, collector);
reportNewCollectorToNM(appId);
} catch (Exception e) {
// throw exception here as it cannot be used if failed report to NM
LOG.error("Failed to report a new collector for application: " + appId +
" to the NM Collector Service.");
// throw exception here as it cannot be used if failed communicate with NM
LOG.error("Failed to communicate with NM Collector Service for " + appId);
throw new YarnRuntimeException(e);
}
}
@ -250,7 +253,6 @@ private void startWebApp() {
private void reportNewCollectorToNM(ApplicationId appId)
throws YarnException, IOException {
this.nmCollectorService = getNMCollectorService();
ReportNewCollectorInfoRequest request =
ReportNewCollectorInfoRequest.newInstance(appId,
this.timelineRestServerBindAddress);
@ -259,6 +261,28 @@ private void reportNewCollectorToNM(ApplicationId appId)
nmCollectorService.reportNewCollectorInfo(request);
}
private void updateTimelineCollectorContext(
ApplicationId appId, TimelineCollector collector)
throws YarnException, IOException {
GetTimelineCollectorContextRequest request =
GetTimelineCollectorContextRequest.newInstance(appId);
LOG.info("Get timeline collector context for " + appId);
GetTimelineCollectorContextResponse response =
nmCollectorService.getTimelineCollectorContext(request);
String userId = response.getUserId();
if (userId != null && !userId.isEmpty()) {
collector.getTimelineEntityContext().setUserId(userId);
}
String flowId = response.getFlowId();
if (flowId != null && !flowId.isEmpty()) {
collector.getTimelineEntityContext().setFlowId(flowId);
}
String flowRunId = response.getFlowRunId();
if (flowRunId != null && !flowRunId.isEmpty()) {
collector.getTimelineEntityContext().setFlowRunId(flowRunId);
}
}
@VisibleForTesting
protected CollectorNodemanagerProtocol getNMCollectorService() {
Configuration conf = getConfig();

View File

@ -138,7 +138,7 @@ public Response putEntities(
LOG.error("Application not found");
throw new NotFoundException(); // different exception?
}
collector.postEntities(entities, callerUgi);
collector.putEntities(entities, callerUgi);
return Response.ok().build();
} catch (Exception e) {
LOG.error("Error putting entities", e);

View File

@ -52,7 +52,9 @@ public class FileSystemTimelineWriterImpl extends AbstractService
/** default value for storage location on local disk */
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
= "/tmp/timeline_service_data/";
= "/tmp/timeline_service_data";
private static final String ENTITIES_DIR = "entities";
/** Default extension for output files */
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
@ -61,38 +63,25 @@ public class FileSystemTimelineWriterImpl extends AbstractService
super((FileSystemTimelineWriterImpl.class.getName()));
}
/**
* Stores the entire information in {@link TimelineEntity} to the
* timeline store. Any errors occurring for individual write request objects
* will be reported in the response.
*
* @param data
* a {@link TimelineEntity} object
* @return {@link TimelineWriteResponse} object.
* @throws IOException
*/
@Override
public TimelineWriteResponse write(TimelineEntities entities)
throws IOException {
public TimelineWriteResponse write(String clusterId, String userId,
String flowId, String flowRunId, String appId,
TimelineEntities entities) throws IOException {
TimelineWriteResponse response = new TimelineWriteResponse();
for (TimelineEntity entity : entities.getEntities()) {
write(entity, response);
write(clusterId, userId, flowId, flowRunId, appId, entity, response);
}
return response;
}
private void write(TimelineEntity entity,
private void write(String clusterId, String userId,
String flowId, String flowRunId, String appId, TimelineEntity entity,
TimelineWriteResponse response) throws IOException {
PrintWriter out = null;
try {
File outputDir = new File(outputRoot + entity.getType());
String fileName = outputDir + "/" + entity.getId()
+ TIMELINE_SERVICE_STORAGE_EXTENSION;
if (!outputDir.exists()) {
if (!outputDir.mkdirs()) {
throw new IOException("Could not create directories for " + fileName);
}
}
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowId,
flowRunId, appId, entity.getType());
String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION;
out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)));
out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
out.write("\n");
@ -112,20 +101,7 @@ private void write(TimelineEntity entity,
}
}
/**
* Aggregates the entity information to the timeline store based on which
* track this entity is to be rolled up to The tracks along which aggregations
* are to be done are given by {@link TimelineAggregationTrack}
*
* Any errors occurring for individual write request objects will be reported
* in the response.
*
* @param data
* a {@link TimelineEntity} object
* a {@link TimelineAggregationTrack} enum value
* @return a {@link TimelineWriteResponse} object.
* @throws IOException
*/
@Override
public TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException {
return null;
@ -141,4 +117,23 @@ public void serviceInit(Configuration conf) throws Exception {
outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
}
@Override
public void serviceStart() throws Exception {
mkdirs(outputRoot, ENTITIES_DIR);
}
private static String mkdirs(String... dirStrs) throws IOException {
StringBuilder path = new StringBuilder();
for (String dirStr : dirStrs) {
path.append(dirStr).append('/');
File dir = new File(path.toString());
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw new IOException("Could not create directories for " + dir);
}
}
}
return path.toString();
}
}

View File

@ -39,12 +39,19 @@ public interface TimelineWriter extends Service {
* timeline store. Any errors occurring for individual write request objects
* will be reported in the response.
*
* @param clusterId context cluster ID
* @param userId context user ID
* @param flowId context flow ID
* @param flowRunId context flow run ID
* @param appId context app ID
* @param data
* a {@link TimelineEntities} object.
* @return a {@link TimelineWriteResponse} object.
* @throws IOException
*/
TimelineWriteResponse write(TimelineEntities data) throws IOException;
TimelineWriteResponse write(String clusterId, String userId,
String flowId, String flowRunId, String appId,
TimelineEntities data) throws IOException;
/**
* Aggregates the entity information to the timeline store based on which

View File

@ -22,6 +22,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -29,16 +30,25 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.junit.After;
import org.junit.Test;
import java.io.IOException;
public class TestPerNodeTimelineCollectorsAuxService {
private ApplicationAttemptId appAttemptId;
private PerNodeTimelineCollectorsAuxService auxService;
public TestPerNodeTimelineCollectorsAuxService() {
ApplicationId appId =
@ -46,10 +56,16 @@ public TestPerNodeTimelineCollectorsAuxService() {
appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
}
@After
public void tearDown() throws Shell.ExitCodeException {
if (auxService != null) {
auxService.stop();
}
}
@Test
public void testAddApplication() throws Exception {
PerNodeTimelineCollectorsAuxService auxService =
createCollectorAndAddApplication();
auxService = createCollectorAndAddApplication();
// auxService should have a single app
assertTrue(auxService.hasApplication(
appAttemptId.getApplicationId().toString()));
@ -58,7 +74,7 @@ public void testAddApplication() throws Exception {
@Test
public void testAddApplicationNonAMContainer() throws Exception {
PerNodeTimelineCollectorsAuxService auxService = createCollector();
auxService = createCollector();
ContainerId containerId = getContainerId(2L); // not an AM
ContainerInitializationContext context =
@ -72,8 +88,7 @@ public void testAddApplicationNonAMContainer() throws Exception {
@Test
public void testRemoveApplication() throws Exception {
PerNodeTimelineCollectorsAuxService auxService =
createCollectorAndAddApplication();
auxService = createCollectorAndAddApplication();
// auxService should have a single app
String appIdStr = appAttemptId.getApplicationId().toString();
assertTrue(auxService.hasApplication(appIdStr));
@ -90,8 +105,7 @@ public void testRemoveApplication() throws Exception {
@Test
public void testRemoveApplicationNonAMContainer() throws Exception {
PerNodeTimelineCollectorsAuxService auxService =
createCollectorAndAddApplication();
auxService = createCollectorAndAddApplication();
// auxService should have a single app
String appIdStr = appAttemptId.getApplicationId().toString();
assertTrue(auxService.hasApplication(appIdStr));
@ -109,7 +123,6 @@ public void testRemoveApplicationNonAMContainer() throws Exception {
@Test(timeout = 60000)
public void testLaunch() throws Exception {
ExitUtil.disableSystemExit();
PerNodeTimelineCollectorsAuxService auxService = null;
try {
auxService =
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
@ -118,10 +131,6 @@ public void testLaunch() throws Exception {
assertEquals(0, e.status);
ExitUtil.resetFirstExitException();
fail();
} finally {
if (auxService != null) {
auxService.stop();
}
}
}
@ -141,6 +150,8 @@ private PerNodeTimelineCollectorsAuxService createCollector() {
TimelineCollectorManager collectorManager = createCollectorManager();
PerNodeTimelineCollectorsAuxService auxService =
spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
auxService.init(new YarnConfiguration());
auxService.start();
return auxService;
}
@ -150,6 +161,14 @@ private TimelineCollectorManager createCollectorManager() {
doReturn(new Configuration()).when(collectorManager).getConfig();
CollectorNodemanagerProtocol nmCollectorService =
mock(CollectorNodemanagerProtocol.class);
GetTimelineCollectorContextResponse response =
GetTimelineCollectorContextResponse.newInstance(null, null, null);
try {
when(nmCollectorService.getTimelineCollectorContext(any(
GetTimelineCollectorContextRequest.class))).thenReturn(response);
} catch (YarnException | IOException e) {
fail();
}
doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
return collectorManager;
}

View File

@ -20,10 +20,14 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@ -33,15 +37,34 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestTimelineCollectorManager {
private TimelineCollectorManager collectorManager;
@Before
public void setup() throws Exception {
collectorManager = createCollectorManager();
collectorManager.init(new YarnConfiguration());
collectorManager.start();
}
@After
public void tearDown() throws Exception {
if (collectorManager != null) {
collectorManager.stop();
}
}
@Test(timeout=60000)
public void testMultithreadedAdd() throws Exception {
final TimelineCollectorManager collectorManager = createCollectorManager();
final int NUM_APPS = 5;
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < NUM_APPS; i++) {
@ -49,7 +72,7 @@ public void testMultithreadedAdd() throws Exception {
Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(appId.toString());
new AppLevelTimelineCollector(appId);
return (collectorManager.putIfAbsent(appId, collector) == collector);
}
};
@ -73,8 +96,6 @@ public Boolean call() {
@Test
public void testMultithreadedAddAndRemove() throws Exception {
final TimelineCollectorManager collectorManager = createCollectorManager();
final int NUM_APPS = 5;
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < NUM_APPS; i++) {
@ -82,7 +103,7 @@ public void testMultithreadedAddAndRemove() throws Exception {
Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(appId.toString());
new AppLevelTimelineCollector(appId);
boolean successPut =
(collectorManager.putIfAbsent(appId, collector) == collector);
return successPut && collectorManager.remove(appId.toString());
@ -112,6 +133,14 @@ private TimelineCollectorManager createCollectorManager() {
doReturn(new Configuration()).when(collectorManager).getConfig();
CollectorNodemanagerProtocol nmCollectorService =
mock(CollectorNodemanagerProtocol.class);
GetTimelineCollectorContextResponse response =
GetTimelineCollectorContextResponse.newInstance(null, null, null);
try {
when(nmCollectorService.getTimelineCollectorContext(any(
GetTimelineCollectorContextRequest.class))).thenReturn(response);
} catch (YarnException | IOException e) {
fail();
}
doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
return collectorManager;
}

View File

@ -28,9 +28,9 @@
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Test;
@ -52,13 +52,16 @@ public void testWriteEntityToFile() throws Exception {
entity.setModifiedTime(1425016502000L);
te.addEntity(entity);
try (FileSystemTimelineWriterImpl fsi =
new FileSystemTimelineWriterImpl()) {
fsi.serviceInit(new Configuration());
fsi.write(te);
FileSystemTimelineWriterImpl fsi = null;
try {
fsi = new FileSystemTimelineWriterImpl();
fsi.init(new YarnConfiguration());
fsi.start();
fsi.write("cluster_id", "user_id", "flow_id", "flow_run_id", "app_id", te);
String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String fileName = fsi.getOutputRoot() +
"/entities/cluster_id/user_id/flow_id/flow_run_id/app_id/" + type +
"/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path = Paths.get(fileName);
File f = new File(fileName);
assertTrue(f.exists() && !f.isDirectory());
@ -73,6 +76,11 @@ public void testWriteEntityToFile() throws Exception {
File outputDir = new File(fsi.getOutputRoot());
FileUtils.deleteDirectory(outputDir);
assertTrue(!(f.exists()));
} finally {
if (fsi != null) {
fsi.stop();
FileUtils.deleteDirectory(new File(fsi.getOutputRoot()));
}
}
}
}