From 1bd345d6e3855ab330963efd32e0fac102e61d1a Mon Sep 17 00:00:00 2001 From: Hitesh Shah Date: Wed, 20 Mar 2013 20:44:35 +0000 Subject: [PATCH] YARN-396. Rationalize AllocateResponse in RM Scheduler API. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1459040 13f79535-47bb-0310-9956-ffa450edef68 --- .../v2/app/local/LocalContainerAllocator.java | 8 +- .../v2/app/rm/RMContainerAllocator.java | 9 +- .../v2/app/rm/RMContainerRequestor.java | 15 +- .../mapreduce/v2/app/MRAppBenchmark.java | 7 +- hadoop-yarn-project/CHANGES.txt | 3 + .../api/protocolrecords/AllocateResponse.java | 94 ++++- .../impl/pb/AllocateResponsePBImpl.java | 342 ++++++++++++++-- .../pb/GetAllApplicationsResponsePBImpl.java | 3 +- .../pb/GetClusterNodesResponsePBImpl.java | 3 +- .../GetQueueUserAclsInfoResponsePBImpl.java | 3 +- .../hadoop/yarn/api/records/AMResponse.java | 138 ------- .../api/records/impl/pb/AMResponsePBImpl.java | 373 ------------------ .../src/main/proto/yarn_protos.proto | 10 - .../src/main/proto/yarn_service_protos.proto | 9 +- .../distributedshell/ApplicationMaster.java | 15 +- .../hadoop/yarn/client/AMRMClientImpl.java | 6 +- .../hadoop/yarn/client/TestAMRMClient.java | 14 +- .../apache/hadoop/yarn/TestRecordFactory.java | 12 +- .../ApplicationMasterService.java | 51 ++- .../yarn/server/resourcemanager/MockAM.java | 13 +- .../resourcemanager/TestFifoScheduler.java | 18 +- .../server/resourcemanager/TestRMRestart.java | 7 +- .../TestAMRMRPCNodeUpdates.java | 18 +- .../TestAMRMRPCResponseId.java | 10 +- .../security/TestApplicationTokens.java | 6 +- .../server/TestContainerManagerSecurity.java | 4 +- .../site/apt/WritingYarnApplications.apt.vm | 16 +- 27 files changed, 518 insertions(+), 689 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index a3299d24d5..abb2397e29 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -100,10 +99,9 @@ protected synchronized void heartbeat() throws Exception { this.applicationAttemptId, this.lastResponseID, super .getApplicationProgress(), new ArrayList(), new ArrayList()); - AMResponse response; + AllocateResponse allocateResponse; try { - AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); - response = allocateResponse.getAMResponse(); + allocateResponse = scheduler.allocate(allocateRequest); // Reset retry count if no exception occurred. retrystartTime = System.currentTimeMillis(); } catch (Exception e) { @@ -120,7 +118,7 @@ protected synchronized void heartbeat() throws Exception { // continue to attempt to contact the RM. throw e; } - if (response.getReboot()) { + if (allocateResponse.getReboot()) { LOG.info("Event from RM: shutting down Application Master"); // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 430d8fd945..d29d11890c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -59,7 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.YarnException; -import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -544,8 +544,9 @@ public void rampDownReduces(int rampDown) { @SuppressWarnings("unchecked") private List getResources() throws Exception { - int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null - AMResponse response; + int headRoom = getAvailableResources() != null + ? getAvailableResources().getMemory() : 0;//first time it would be null + AllocateResponse response; /* * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS * milliseconds before aborting. During this interval, AM will still try @@ -634,7 +635,7 @@ public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, } @SuppressWarnings("unchecked") - private void handleUpdatedNodes(AMResponse response) { + private void handleUpdatedNodes(AllocateResponse response) { // send event to the job about on updated nodes List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 5f3a7f5f96..d9a2c1f4ec 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -146,30 +145,30 @@ public void init(Configuration conf) { LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); } - protected AMResponse makeRemoteRequest() throws YarnRemoteException { + protected AllocateResponse makeRemoteRequest() throws YarnRemoteException { AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( applicationAttemptId, lastResponseID, super.getApplicationProgress(), new ArrayList(ask), new ArrayList( release)); AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); - AMResponse response = allocateResponse.getAMResponse(); - lastResponseID = response.getResponseId(); - availableResources = response.getAvailableResources(); + lastResponseID = allocateResponse.getResponseId(); + availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; clusterNmCount = allocateResponse.getNumClusterNodes(); if (ask.size() > 0 || release.size() > 0) { LOG.info("getResources() for " + applicationId + ":" + " ask=" + ask.size() + " release= " + release.size() + " newContainers=" - + response.getAllocatedContainers().size() + " finishedContainers=" - + response.getCompletedContainersStatuses().size() + + allocateResponse.getAllocatedContainers().size() + + " finishedContainers=" + + allocateResponse.getCompletedContainersStatuses().size() + " resourcelimit=" + availableResources + " knownNMs=" + clusterNmCount); } ask.clear(); release.clear(); - return response; + return allocateResponse; } // May be incorrect if there's multiple NodeManagers running on a single host. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java index 6cc772911d..e50d91c0cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -248,10 +247,8 @@ public AllocateResponse allocate(AllocateRequest request) } } - AMResponse amResponse = Records.newRecord(AMResponse.class); - amResponse.setAllocatedContainers(containers); - amResponse.setResponseId(request.getResponseId() + 1); - response.setAMResponse(amResponse); + response.setAllocatedContainers(containers); + response.setResponseId(request.getResponseId() + 1); response.setNumClusterNodes(350); return response; } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c5227d588d..f3f6f8ec7f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -52,6 +52,9 @@ Release 2.0.5-beta - UNRELEASED INCOMPATIBLE CHANGES + YARN-396. Rationalize AllocateResponse in RM Scheduler API. (Zhijie Shen + via hitesh) + NEW FEATURES IMPROVEMENTS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index a9ef899420..0426ee359a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -18,19 +18,23 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; /** *

The response sent by the ResourceManager the * ApplicationMaster during resource negotiation.

* - *

The response, via {@link AMResponse}, includes: + *

The response, includes: *

    *
  • Response ID to track duplicate responses.
  • *
  • @@ -42,6 +46,8 @@ * The available headroom for resources in the cluster for the * application. *
  • + *
  • A list of nodes whose status has been updated.
  • + *
  • The number of available nodes in a cluster.
  • *
*

* @@ -51,18 +57,90 @@ @Stable public interface AllocateResponse { /** - * Get the {@link AMResponse} sent by the ResourceManager. - * @return AMResponse sent by the ResourceManager + * Should the ApplicationMaster reboot for being horribly + * out-of-sync with the ResourceManager as deigned by + * {@link #getResponseId()}? + * + * @return true if the ApplicationMaster should + * reboot, false otherwise */ @Public @Stable - public abstract AMResponse getAMResponse(); + public boolean getReboot(); @Private @Unstable - public abstract void setAMResponse(AMResponse amResponse); - - + public void setReboot(boolean reboot); + + /** + * Get the last response id. + * @return last response id + */ + @Public + @Stable + public int getResponseId(); + + @Private + @Unstable + public void setResponseId(int responseId); + + /** + * Get the list of newly allocated Container by the + * ResourceManager. + * @return list of newly allocated Container + */ + @Public + @Stable + public List getAllocatedContainers(); + + /** + * Set the list of newly allocated Container by the + * ResourceManager. + * @param containers list of newly allocated Container + */ + @Public + @Stable + public void setAllocatedContainers(List containers); + + /** + * Get the available headroom for resources in the cluster for the + * application. + * @return limit of available headroom for resources in the cluster for the + * application + */ + @Public + @Stable + public Resource getAvailableResources(); + + @Private + @Unstable + public void setAvailableResources(Resource limit); + + /** + * Get the list of completed containers' statuses. + * @return the list of completed containers' statuses + */ + @Public + @Stable + public List getCompletedContainersStatuses(); + + @Private + @Unstable + public void setCompletedContainersStatuses(List containers); + + /** + * Get the list of updated NodeReports. Updates could + * be changes in health, availability etc of the nodes. + * @return The delta of updated nodes since the last response + */ + @Public + @Unstable + public List getUpdatedNodes(); + + @Private + @Unstable + public void setUpdatedNodes(final List updatedNodes); + /** * Get the number of hosts available on the cluster. * @return the available host count. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 971f23a771..4643e4ed02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -19,11 +19,24 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; @@ -35,7 +48,12 @@ public class AllocateResponsePBImpl extends ProtoBase AllocateResponseProto.Builder builder = null; boolean viaProto = false; - private AMResponse amResponse; + Resource limit; + + private List allocatedContainers = null; + private List completedContainersStatuses = null; + + private List updatedNodes = null; public AllocateResponsePBImpl() { @@ -47,20 +65,38 @@ public AllocateResponsePBImpl(AllocateResponseProto proto) { viaProto = true; } - public AllocateResponseProto getProto() { + public synchronized AllocateResponseProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; } - private void mergeLocalToBuilder() { - if (this.amResponse != null) { - builder.setAMResponse(convertToProtoFormat(this.amResponse)); + private synchronized void mergeLocalToBuilder() { + if (this.allocatedContainers != null) { + builder.clearAllocatedContainers(); + Iterable iterable = + getProtoIterable(this.allocatedContainers); + builder.addAllAllocatedContainers(iterable); + } + if (this.completedContainersStatuses != null) { + builder.clearCompletedContainerStatuses(); + Iterable iterable = + getContainerStatusProtoIterable(this.completedContainersStatuses); + builder.addAllCompletedContainerStatuses(iterable); + } + if (this.updatedNodes != null) { + builder.clearUpdatedNodes(); + Iterable iterable = + getNodeReportProtoIterable(this.updatedNodes); + builder.addAllUpdatedNodes(iterable); + } + if (this.limit != null) { + builder.setLimit(convertToProtoFormat(this.limit)); } } - private void mergeLocalToProto() { + private synchronized void mergeLocalToProto() { if (viaProto) maybeInitBuilder(); mergeLocalToBuilder(); @@ -68,53 +104,293 @@ private void mergeLocalToProto() { viaProto = true; } - private void maybeInitBuilder() { + private synchronized void maybeInitBuilder() { if (viaProto || builder == null) { builder = AllocateResponseProto.newBuilder(proto); } viaProto = false; } - @Override - public AMResponse getAMResponse() { + public synchronized boolean getReboot() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - if (this.amResponse != null) { - return this.amResponse; - } - if (!p.hasAMResponse()) { - return null; - } - this.amResponse= convertFromProtoFormat(p.getAMResponse()); - return this.amResponse; + return (p.getReboot()); } @Override - public void setAMResponse(AMResponse aMResponse) { + public synchronized void setReboot(boolean reboot) { maybeInitBuilder(); - if (aMResponse == null) - builder.clearAMResponse(); - this.amResponse = aMResponse; + builder.setReboot((reboot)); } - + @Override - public int getNumClusterNodes() { + public synchronized int getResponseId() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.getResponseId()); + } + + @Override + public synchronized void setResponseId(int responseId) { + maybeInitBuilder(); + builder.setResponseId((responseId)); + } + + @Override + public synchronized Resource getAvailableResources() { + if (this.limit != null) { + return this.limit; + } + + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasLimit()) { + return null; + } + this.limit = convertFromProtoFormat(p.getLimit()); + return this.limit; + } + + @Override + public synchronized void setAvailableResources(Resource limit) { + maybeInitBuilder(); + if (limit == null) + builder.clearLimit(); + this.limit = limit; + } + + @Override + public synchronized List getUpdatedNodes() { + initLocalNewNodeReportList(); + return this.updatedNodes; + } + @Override + public synchronized void setUpdatedNodes( + final List updatedNodes) { + if (updatedNodes == null) { + this.updatedNodes.clear(); + return; + } + this.updatedNodes = new ArrayList(updatedNodes.size()); + this.updatedNodes.addAll(updatedNodes); + } + + @Override + public synchronized List getAllocatedContainers() { + initLocalNewContainerList(); + return this.allocatedContainers; + } + + @Override + public synchronized void setAllocatedContainers( + final List containers) { + if (containers == null) + return; + // this looks like a bug because it results in append and not set + initLocalNewContainerList(); + allocatedContainers.addAll(containers); + } + + //// Finished containers + @Override + public synchronized List getCompletedContainersStatuses() { + initLocalFinishedContainerList(); + return this.completedContainersStatuses; + } + + @Override + public synchronized void setCompletedContainersStatuses( + final List containers) { + if (containers == null) + return; + initLocalFinishedContainerList(); + completedContainersStatuses.addAll(containers); + } + + @Override + public synchronized int getNumClusterNodes() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; return p.getNumClusterNodes(); } - + @Override - public void setNumClusterNodes(int numNodes) { + public synchronized void setNumClusterNodes(int numNodes) { maybeInitBuilder(); builder.setNumClusterNodes(numNodes); } - - private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) { - return new AMResponsePBImpl(p); + // Once this is called. updatedNodes will never be null - until a getProto is + // called. + private synchronized void initLocalNewNodeReportList() { + if (this.updatedNodes != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getUpdatedNodesList(); + updatedNodes = new ArrayList(list.size()); + + for (NodeReportProto n : list) { + updatedNodes.add(convertFromProtoFormat(n)); + } } - private AMResponseProto convertToProtoFormat(AMResponse t) { - return ((AMResponsePBImpl)t).getProto(); + // Once this is called. containerList will never be null - until a getProto + // is called. + private synchronized void initLocalNewContainerList() { + if (this.allocatedContainers != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getAllocatedContainersList(); + allocatedContainers = new ArrayList(); + + for (ContainerProto c : list) { + allocatedContainers.add(convertFromProtoFormat(c)); + } } + + private synchronized Iterable getProtoIterable( + final List newContainersList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = newContainersList.iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized ContainerProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + } + + private synchronized Iterable + getContainerStatusProtoIterable( + final List newContainersList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = newContainersList.iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized ContainerStatusProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + } + + private synchronized Iterable + getNodeReportProtoIterable( + final List newNodeReportsList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = newNodeReportsList.iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized NodeReportProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + } + + // Once this is called. containerList will never be null - until a getProto + // is called. + private synchronized void initLocalFinishedContainerList() { + if (this.completedContainersStatuses != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getCompletedContainerStatusesList(); + completedContainersStatuses = new ArrayList(); + + for (ContainerStatusProto c : list) { + completedContainersStatuses.add(convertFromProtoFormat(c)); + } + } + + private synchronized NodeReportPBImpl convertFromProtoFormat( + NodeReportProto p) { + return new NodeReportPBImpl(p); + } + + private synchronized NodeReportProto convertToProtoFormat(NodeReport t) { + return ((NodeReportPBImpl)t).getProto(); + } + + private synchronized ContainerPBImpl convertFromProtoFormat( + ContainerProto p) { + return new ContainerPBImpl(p); + } + + private synchronized ContainerProto convertToProtoFormat(Container t) { + return ((ContainerPBImpl)t).getProto(); + } + + private synchronized ContainerStatusPBImpl convertFromProtoFormat( + ContainerStatusProto p) { + return new ContainerStatusPBImpl(p); + } + + private synchronized ContainerStatusProto convertToProtoFormat( + ContainerStatus t) { + return ((ContainerStatusPBImpl)t).getProto(); + } + + private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) { + return new ResourcePBImpl(p); + } + + private synchronized ResourceProto convertToProtoFormat(Resource r) { + return ((ResourcePBImpl) r).getProto(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetAllApplicationsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetAllApplicationsResponsePBImpl.java index 0a22383e8c..956f589326 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetAllApplicationsResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetAllApplicationsResponsePBImpl.java @@ -93,7 +93,8 @@ private void maybeInitBuilder() { viaProto = false; } - //Once this is called. containerList will never be null - untill a getProto is called. + // Once this is called. containerList will never be null - until a getProto + // is called. private void initLocalApplicationsList() { if (this.applicationList != null) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterNodesResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterNodesResponsePBImpl.java index d7269cebf5..b549ce1931 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterNodesResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterNodesResponsePBImpl.java @@ -92,7 +92,8 @@ private void maybeInitBuilder() { viaProto = false; } - //Once this is called. containerList will never be null - untill a getProto is called. + // Once this is called. containerList will never be null - until a getProto + // is called. private void initLocalNodeManagerInfosList() { if (this.nodeManagerInfoList != null) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetQueueUserAclsInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetQueueUserAclsInfoResponsePBImpl.java index d44c752711..16ef53fecc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetQueueUserAclsInfoResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetQueueUserAclsInfoResponsePBImpl.java @@ -94,7 +94,8 @@ private void maybeInitBuilder() { viaProto = false; } - //Once this is called. containerList will never be null - untill a getProto is called. + // Once this is called. containerList will never be null - until a getProto + // is called. private void initLocalQueueUserAclsList() { if (this.queueUserAclsInfoList != null) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java deleted file mode 100644 index 2e14441211..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java +++ /dev/null @@ -1,138 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.api.records; - -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Stable; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; - -/** - *

The response sent by the ResourceManager the - * ApplicationMaster during resource negotiation.

- * - *

The response includes: - *

    - *
  • Response ID to track duplicate responses.
  • - *
  • - * A reboot flag to let the ApplicationMaster know that its - * horribly out of sync and needs to reboot.
  • - *
  • A list of newly allocated {@link Container}.
  • - *
  • A list of completed {@link Container}.
  • - *
  • - * The available headroom for resources in the cluster for the - * application. - *
  • - *
- *

- * - * @see AMRMProtocol#allocate(AllocateRequest) - */ -@Public -@Unstable -public interface AMResponse { - /** - * Should the ApplicationMaster reboot for being horribly - * out-of-sync with the ResourceManager as deigned by - * {@link #getResponseId()}? - * - * @return true if the ApplicationMaster should - * reboot, false otherwise - */ - @Public - @Stable - public boolean getReboot(); - - @Private - @Unstable - public void setReboot(boolean reboot); - - /** - * Get the last response id. - * @return last response id - */ - @Public - @Stable - public int getResponseId(); - - @Private - @Unstable - public void setResponseId(int responseId); - - /** - * Get the list of newly allocated Container by the - * ResourceManager. - * @return list of newly allocated Container - */ - @Public - @Stable - public List getAllocatedContainers(); - - /** - * Set the list of newly allocated Container by the - * ResourceManager. - * @param containers list of newly allocated Container - */ - @Public - @Stable - public void setAllocatedContainers(List containers); - - /** - * Get the available headroom for resources in the cluster for the - * application. - * @return limit of available headroom for resources in the cluster for the - * application - */ - @Public - @Stable - public Resource getAvailableResources(); - - @Private - @Unstable - public void setAvailableResources(Resource limit); - - /** - * Get the list of completed containers' statuses. - * @return the list of completed containers' statuses - */ - @Public - @Stable - public List getCompletedContainersStatuses(); - - @Private - @Unstable - public void setCompletedContainersStatuses(List containers); - - /** - * Get the list of updated NodeReports. Updates could be - * changes in health, availability etc of the nodes. - * @return The delta of updated nodes since the last response - */ - @Public - @Unstable - public List getUpdatedNodes(); - - @Private - @Unstable - public void setUpdatedNodes(final List updatedNodes); -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java deleted file mode 100644 index 188a9c52ae..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java +++ /dev/null @@ -1,373 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records.impl.pb; - - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto; -import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; -import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; - - - -public class AMResponsePBImpl extends ProtoBase implements AMResponse { - AMResponseProto proto = AMResponseProto.getDefaultInstance(); - AMResponseProto.Builder builder = null; - boolean viaProto = false; - - Resource limit; - - private List allocatedContainers = null; - private List completedContainersStatuses = null; -// private boolean hasLocalContainerList = false; - - private List updatedNodes = null; - - public AMResponsePBImpl() { - builder = AMResponseProto.newBuilder(); - } - - public AMResponsePBImpl(AMResponseProto proto) { - this.proto = proto; - viaProto = true; - } - - public synchronized AMResponseProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - private synchronized void mergeLocalToBuilder() { - if (this.allocatedContainers != null) { - builder.clearAllocatedContainers(); - Iterable iterable = - getProtoIterable(this.allocatedContainers); - builder.addAllAllocatedContainers(iterable); - } - if (this.completedContainersStatuses != null) { - builder.clearCompletedContainerStatuses(); - Iterable iterable = - getContainerStatusProtoIterable(this.completedContainersStatuses); - builder.addAllCompletedContainerStatuses(iterable); - } - if (this.updatedNodes != null) { - builder.clearUpdatedNodes(); - Iterable iterable = - getNodeReportProtoIterable(this.updatedNodes); - builder.addAllUpdatedNodes(iterable); - } - if (this.limit != null) { - builder.setLimit(convertToProtoFormat(this.limit)); - } - } - - private synchronized void mergeLocalToProto() { - if (viaProto) - maybeInitBuilder(); - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private synchronized void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = AMResponseProto.newBuilder(proto); - } - viaProto = false; - } - - - @Override - public synchronized boolean getReboot() { - AMResponseProtoOrBuilder p = viaProto ? proto : builder; - return (p.getReboot()); - } - - @Override - public synchronized void setReboot(boolean reboot) { - maybeInitBuilder(); - builder.setReboot((reboot)); - } - @Override - public synchronized int getResponseId() { - AMResponseProtoOrBuilder p = viaProto ? proto : builder; - return (p.getResponseId()); - } - - @Override - public synchronized void setResponseId(int responseId) { - maybeInitBuilder(); - builder.setResponseId((responseId)); - } - @Override - public synchronized Resource getAvailableResources() { - if (this.limit != null) { - return this.limit; - } - - AMResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasLimit()) { - return null; - } - this.limit = convertFromProtoFormat(p.getLimit()); - return this.limit; - } - - @Override - public synchronized void setAvailableResources(Resource limit) { - maybeInitBuilder(); - if (limit == null) - builder.clearLimit(); - this.limit = limit; - } - - @Override - public synchronized List getUpdatedNodes() { - initLocalNewNodeReportList(); - return this.updatedNodes; - } - - //Once this is called. updatedNodes will never be null - until a getProto is called. - private synchronized void initLocalNewNodeReportList() { - if (this.updatedNodes != null) { - return; - } - AMResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getUpdatedNodesList(); - updatedNodes = new ArrayList(list.size()); - - for (NodeReportProto n : list) { - updatedNodes.add(convertFromProtoFormat(n)); - } - } - - @Override - public synchronized void setUpdatedNodes(final List updatedNodes) { - if (updatedNodes == null) { - this.updatedNodes.clear(); - return; - } - this.updatedNodes = new ArrayList(updatedNodes.size()); - this.updatedNodes.addAll(updatedNodes); - } - - @Override - public synchronized List getAllocatedContainers() { - initLocalNewContainerList(); - return this.allocatedContainers; - } - - //Once this is called. containerList will never be null - until a getProto is called. - private synchronized void initLocalNewContainerList() { - if (this.allocatedContainers != null) { - return; - } - AMResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getAllocatedContainersList(); - allocatedContainers = new ArrayList(); - - for (ContainerProto c : list) { - allocatedContainers.add(convertFromProtoFormat(c)); - } - } - - @Override - public synchronized void setAllocatedContainers(final List containers) { - if (containers == null) - return; - // this looks like a bug because it results in append and not set - initLocalNewContainerList(); - allocatedContainers.addAll(containers); - } - - private synchronized Iterable getProtoIterable( - final List newContainersList) { - maybeInitBuilder(); - return new Iterable() { - @Override - public synchronized Iterator iterator() { - return new Iterator() { - - Iterator iter = newContainersList.iterator(); - - @Override - public synchronized boolean hasNext() { - return iter.hasNext(); - } - - @Override - public synchronized ContainerProto next() { - return convertToProtoFormat(iter.next()); - } - - @Override - public synchronized void remove() { - throw new UnsupportedOperationException(); - - } - }; - - } - }; - } - - private synchronized Iterable - getContainerStatusProtoIterable( - final List newContainersList) { - maybeInitBuilder(); - return new Iterable() { - @Override - public synchronized Iterator iterator() { - return new Iterator() { - - Iterator iter = newContainersList.iterator(); - - @Override - public synchronized boolean hasNext() { - return iter.hasNext(); - } - - @Override - public synchronized ContainerStatusProto next() { - return convertToProtoFormat(iter.next()); - } - - @Override - public synchronized void remove() { - throw new UnsupportedOperationException(); - - } - }; - - } - }; - } - - private synchronized Iterable - getNodeReportProtoIterable( - final List newNodeReportsList) { - maybeInitBuilder(); - return new Iterable() { - @Override - public synchronized Iterator iterator() { - return new Iterator() { - - Iterator iter = newNodeReportsList.iterator(); - - @Override - public synchronized boolean hasNext() { - return iter.hasNext(); - } - - @Override - public synchronized NodeReportProto next() { - return convertToProtoFormat(iter.next()); - } - - @Override - public synchronized void remove() { - throw new UnsupportedOperationException(); - - } - }; - - } - }; - } - - //// Finished containers - @Override - public synchronized List getCompletedContainersStatuses() { - initLocalFinishedContainerList(); - return this.completedContainersStatuses; - } - - //Once this is called. containerList will never be null - untill a getProto is called. - private synchronized void initLocalFinishedContainerList() { - if (this.completedContainersStatuses != null) { - return; - } - AMResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getCompletedContainerStatusesList(); - completedContainersStatuses = new ArrayList(); - - for (ContainerStatusProto c : list) { - completedContainersStatuses.add(convertFromProtoFormat(c)); - } - } - - @Override - public synchronized void setCompletedContainersStatuses( - final List containers) { - if (containers == null) - return; - initLocalFinishedContainerList(); - completedContainersStatuses.addAll(containers); - } - - private synchronized NodeReportPBImpl convertFromProtoFormat( - NodeReportProto p) { - return new NodeReportPBImpl(p); - } - - private synchronized NodeReportProto convertToProtoFormat(NodeReport t) { - return ((NodeReportPBImpl)t).getProto(); - } - - private synchronized ContainerPBImpl convertFromProtoFormat( - ContainerProto p) { - return new ContainerPBImpl(p); - } - - private synchronized ContainerProto convertToProtoFormat(Container t) { - return ((ContainerPBImpl)t).getProto(); - } - - private synchronized ContainerStatusPBImpl convertFromProtoFormat( - ContainerStatusProto p) { - return new ContainerStatusPBImpl(p); - } - - private synchronized ContainerStatusProto convertToProtoFormat(ContainerStatus t) { - return ((ContainerStatusPBImpl)t).getProto(); - } - - private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) { - return new ResourcePBImpl(p); - } - - private synchronized ResourceProto convertToProtoFormat(Resource r) { - return ((ResourcePBImpl) r).getProto(); - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3fe519f857..66fcca0603 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -207,16 +207,6 @@ message ResourceRequestProto { optional int32 num_containers = 4; } -message AMResponseProto { - optional bool reboot = 1; - optional int32 response_id = 2; - repeated ContainerProto allocated_containers = 3; - repeated ContainerStatusProto completed_container_statuses = 4; - optional ResourceProto limit = 5; - repeated NodeReportProto updated_nodes = 6; -} - - //////////////////////////////////////////////////////////////////////// ////// From client_RM_Protocol ///////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 2c59e9fe5d..50d1cd320e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -59,8 +59,13 @@ message AllocateRequestProto { } message AllocateResponseProto { - optional AMResponseProto AM_response = 1; - optional int32 num_cluster_nodes = 2; + optional bool reboot = 1; + optional int32 response_id = 2; + repeated ContainerProto allocated_containers = 3; + repeated ContainerStatusProto completed_container_statuses = 4; + optional ResourceProto limit = 5; + repeated NodeReportProto updated_nodes = 6; + optional int32 num_cluster_nodes = 7; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 71a81b0496..1fd1bd26c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -510,10 +509,11 @@ public boolean run() throws YarnRemoteException { // Send the request to RM LOG.info("Asking RM for containers" + ", askCount=" + askCount); - AMResponse amResp = sendContainerAskToRM(); + AllocateResponse allocResp = sendContainerAskToRM(); // Retrieve list of allocated containers from the response - List allocatedContainers = amResp.getAllocatedContainers(); + List allocatedContainers = + allocResp.getAllocatedContainers(); LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); @@ -542,12 +542,12 @@ public boolean run() throws YarnRemoteException { // Check what the current available resources in the cluster are // TODO should we do anything if the available resources are not enough? - Resource availableResources = amResp.getAvailableResources(); + Resource availableResources = allocResp.getAvailableResources(); LOG.info("Current available resources in the cluster " + availableResources); // Check the completed containers - List completedContainers = amResp + List completedContainers = allocResp .getCompletedContainersStatuses(); LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size()); @@ -819,14 +819,13 @@ private ContainerRequest setupContainerAskForRM(int numContainers) { * @return Response from RM to AM with allocated containers * @throws YarnRemoteException */ - private AMResponse sendContainerAskToRM() throws YarnRemoteException { + private AllocateResponse sendContainerAskToRM() throws YarnRemoteException { float progressIndicator = (float) numCompletedContainers.get() / numTotalContainers; LOG.info("Sending request to RM for containers" + ", progress=" + progressIndicator); - AllocateResponse resp = resourceManager.allocate(progressIndicator); - return resp.getAMResponse(); + return resourceManager.allocate(progressIndicator); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java index 42b5adbbbf..9f4358717b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java @@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -194,13 +193,12 @@ public AllocateResponse allocate(float progressIndicator) } allocateResponse = rmClient.allocate(allocateRequest); - AMResponse response = allocateResponse.getAMResponse(); synchronized (this) { // update these on successful RPC clusterNodeCount = allocateResponse.getNumClusterNodes(); - lastResponseId = response.getResponseId(); - clusterAvailableResources = response.getAvailableResources(); + lastResponseId = allocateResponse.getResponseId(); + clusterAvailableResources = allocateResponse.getAvailableResources(); } } finally { // TODO how to differentiate remote yarn exception vs error in rpc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java index 95fae134b2..fd298ce96a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -202,9 +201,8 @@ private void testAllocation(final AMRMClientImpl amClient) assertTrue(amClient.release.size() == 0); assertTrue(nodeCount == amClient.getClusterNodeCount()); - AMResponse amResponse = allocResponse.getAMResponse(); - allocatedContainerCount += amResponse.getAllocatedContainers().size(); - for(Container container : amResponse.getAllocatedContainers()) { + allocatedContainerCount += allocResponse.getAllocatedContainers().size(); + for(Container container : allocResponse.getAllocatedContainers()) { ContainerId rejectContainerId = container.getId(); releases.add(rejectContainerId); amClient.releaseAssignedContainer(rejectContainerId); @@ -264,11 +262,11 @@ public AllocateResponse answer(InvocationOnMock invocation) while(!releases.isEmpty() || iterationsLeft-- > 0) { // inform RM of rejection AllocateResponse allocResponse = amClient.allocate(0.1f); - AMResponse amResponse = allocResponse.getAMResponse(); // RM did not send new containers because AM does not need any - assertTrue(amResponse.getAllocatedContainers().size() == 0); - if(amResponse.getCompletedContainersStatuses().size() > 0) { - for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) { + assertTrue(allocResponse.getAllocatedContainers().size() == 0); + if(allocResponse.getCompletedContainersStatuses().size() > 0) { + for(ContainerStatus cStatus :allocResponse + .getCompletedContainersStatuses()) { if(releases.contains(cStatus.getContainerId())) { assertTrue(cStatus.getState() == ContainerState.COMPLETE); assertTrue(cStatus.getExitStatus() == -100); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java index 337ba15c3c..19c50ce1cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java @@ -23,9 +23,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; -import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; import org.junit.Test; public class TestRecordFactory { @@ -35,15 +35,17 @@ public void testPbRecordFactory() { RecordFactory pbRecordFactory = RecordFactoryPBImpl.get(); try { - AMResponse response = pbRecordFactory.newRecordInstance(AMResponse.class); - Assert.assertEquals(AMResponsePBImpl.class, response.getClass()); + AllocateResponse response = + pbRecordFactory.newRecordInstance(AllocateResponse.class); + Assert.assertEquals(AllocateResponsePBImpl.class, response.getClass()); } catch (YarnException e) { e.printStackTrace(); Assert.fail("Failed to crete record"); } try { - AllocateRequest response = pbRecordFactory.newRecordInstance(AllocateRequest.class); + AllocateRequest response = + pbRecordFactory.newRecordInstance(AllocateRequest.class); Assert.assertEquals(AllocateRequestPBImpl.class, response.getClass()); } catch (YarnException e) { e.printStackTrace(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 057e92d542..60d4d7e9bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -78,10 +77,12 @@ public class ApplicationMasterService extends AbstractService implements private YarnScheduler rScheduler; private InetSocketAddress bindAddress; private Server server; - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private final ConcurrentMap responseMap = - new ConcurrentHashMap(); - private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class); + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + private final ConcurrentMap responseMap = + new ConcurrentHashMap(); + private final AllocateResponse reboot = + recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { @@ -166,7 +167,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( authorizeRequest(applicationAttemptId); ApplicationId appID = applicationAttemptId.getApplicationId(); - AMResponse lastResponse = responseMap.get(applicationAttemptId); + AllocateResponse lastResponse = responseMap.get(applicationAttemptId); if (lastResponse == null) { String message = "Application doesn't exist in cache " + applicationAttemptId; @@ -214,7 +215,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( .getApplicationAttemptId(); authorizeRequest(applicationAttemptId); - AMResponse lastResponse = responseMap.get(applicationAttemptId); + AllocateResponse lastResponse = responseMap.get(applicationAttemptId); if (lastResponse == null) { String message = "Application doesn't exist in cache " + applicationAttemptId; @@ -248,25 +249,20 @@ public AllocateResponse allocate(AllocateRequest request) this.amLivelinessMonitor.receivedPing(appAttemptId); /* check if its in cache */ - AllocateResponse allocateResponse = recordFactory - .newRecordInstance(AllocateResponse.class); - AMResponse lastResponse = responseMap.get(appAttemptId); + AllocateResponse lastResponse = responseMap.get(appAttemptId); if (lastResponse == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - allocateResponse.setAMResponse(reboot); - return allocateResponse; + return reboot; } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { /* old heartbeat */ - allocateResponse.setAMResponse(lastResponse); - return allocateResponse; + return lastResponse; } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { LOG.error("Invalid responseid from appAttemptId " + appAttemptId); // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: // Reboot is not useful since after AM reboots, it will send register and // get an exception. Might as well throw an exception here. - allocateResponse.setAMResponse(reboot); - return allocateResponse; + return reboot; } // Allow only one thread in AM to do heartbeat at a time. @@ -288,7 +284,8 @@ public AllocateResponse allocate(AllocateRequest request) appAttemptId.getApplicationId()); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AMResponse response = recordFactory.newRecordInstance(AMResponse.class); + AllocateResponse allocateResponse = + recordFactory.newRecordInstance(AllocateResponse.class); // update the response with the deltas of node status changes List updatedNodes = new ArrayList(); @@ -311,34 +308,34 @@ public AllocateResponse allocate(AllocateRequest request) updatedNodeReports.add(report); } - response.setUpdatedNodes(updatedNodeReports); + allocateResponse.setUpdatedNodes(updatedNodeReports); } - response.setAllocatedContainers(allocation.getContainers()); - response.setCompletedContainersStatuses(appAttempt + allocateResponse.setAllocatedContainers(allocation.getContainers()); + allocateResponse.setCompletedContainersStatuses(appAttempt .pullJustFinishedContainers()); - response.setResponseId(lastResponse.getResponseId() + 1); - response.setAvailableResources(allocation.getResourceLimit()); + allocateResponse.setResponseId(lastResponse.getResponseId() + 1); + allocateResponse.setAvailableResources(allocation.getResourceLimit()); - AMResponse oldResponse = responseMap.put(appAttemptId, response); + AllocateResponse oldResponse = + responseMap.put(appAttemptId, allocateResponse); if (oldResponse == null) { // appAttempt got unregistered, remove it back out responseMap.remove(appAttemptId); String message = "App Attempt removed from the cache during allocate" + appAttemptId; LOG.error(message); - allocateResponse.setAMResponse(reboot); - return allocateResponse; + return reboot; } - allocateResponse.setAMResponse(response); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); return allocateResponse; } } public void registerAppAttempt(ApplicationAttemptId attemptId) { - AMResponse response = recordFactory.newRecordInstance(AMResponse.class); + AllocateResponse response = + recordFactory.newRecordInstance(AllocateResponse.class); response.setResponseId(0); LOG.info("Registering " + attemptId); responseMap.put(attemptId, response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index ace5efb1fa..9eb5bfb9b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -96,14 +94,14 @@ public void addRequests(String[] hosts, int memory, int priority, requests.addAll(createReq(hosts, memory, priority, containers)); } - public AMResponse schedule() throws Exception { - AMResponse response = allocate(requests, releases); + public AllocateResponse schedule() throws Exception { + AllocateResponse response = allocate(requests, releases); requests.clear(); releases.clear(); return response; } - public AMResponse allocate( + public AllocateResponse allocate( String host, int memory, int numContainers, List releases) throws Exception { List reqs = createReq(new String[]{host}, memory, 1, numContainers); @@ -143,13 +141,12 @@ public ResourceRequest createResourceReq(String resource, int memory, int priori return req; } - public AMResponse allocate( + public AllocateResponse allocate( List resourceRequest, List releases) throws Exception { AllocateRequest req = BuilderUtils.newAllocateRequest(attemptId, ++responseId, 0F, resourceRequest, releases); - AllocateResponse resp = amRMProtocol.allocate(req); - return resp.getAMResponse(); + return amRMProtocol.allocate(req); } public void unregisterAppAttempt() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 9190433ce4..8965f40f1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -26,7 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -99,32 +99,32 @@ public void test() throws Exception { // add request for containers am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1); - AMResponse am1Response = am1.schedule(); // send the request + AllocateResponse alloc1Response = am1.schedule(); // send the request // add request for containers am2.addRequests(new String[] { "h1", "h2" }, 3 * GB, 0, 1); - AMResponse am2Response = am2.schedule(); // send the request + AllocateResponse alloc2Response = am2.schedule(); // send the request // kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0 nm1.nodeHeartbeat(true); - while (am1Response.getAllocatedContainers().size() < 1) { + while (alloc1Response.getAllocatedContainers().size() < 1) { LOG.info("Waiting for containers to be created for app 1..."); Thread.sleep(1000); - am1Response = am1.schedule(); + alloc1Response = am1.schedule(); } - while (am2Response.getAllocatedContainers().size() < 1) { + while (alloc2Response.getAllocatedContainers().size() < 1) { LOG.info("Waiting for containers to be created for app 2..."); Thread.sleep(1000); - am2Response = am2.schedule(); + alloc2Response = am2.schedule(); } // kick the scheduler, nothing given remaining 2 GB. nm2.nodeHeartbeat(true); - List allocated1 = am1Response.getAllocatedContainers(); + List allocated1 = alloc1Response.getAllocatedContainers(); Assert.assertEquals(1, allocated1.size()); Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory()); Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); - List allocated2 = am2Response.getAllocatedContainers(); + List allocated2 = alloc2Response.getAllocatedContainers(); Assert.assertEquals(1, allocated2.size()); Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory()); Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index babe79a2de..63868b0c49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -23,7 +23,7 @@ import java.util.Map; import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -213,9 +213,10 @@ public void testRMRestart() throws Exception { // verify old AM is not accepted // change running AM to talk to new RM am1.setAMRMProtocol(rm2.getApplicationMasterService()); - AMResponse amResponse = am1.allocate(new ArrayList(), + AllocateResponse allocResponse = am1.allocate( + new ArrayList(), new ArrayList()); - Assert.assertTrue(amResponse.getReboot()); + Assert.assertTrue(allocResponse.getReboot()); // NM should be rebooted on heartbeat, even first heartbeat for nm2 HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index d607c0181d..b5fb09a11d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -21,7 +21,7 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.Dispatcher; @@ -109,7 +109,7 @@ public void testAMRMUnusableNodes() throws Exception { // allocate request returns no updated node AllocateRequest allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1 .getAppAttemptId(), 0, 0F, null, null); - AMResponse response1 = amService.allocate(allocateRequest1).getAMResponse(); + AllocateResponse response1 = amService.allocate(allocateRequest1); List updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(0, updatedNodes.size()); @@ -118,7 +118,7 @@ public void testAMRMUnusableNodes() throws Exception { // allocate request returns updated node allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1 .getAppAttemptId(), response1.getResponseId(), 0F, null, null); - response1 = amService.allocate(allocateRequest1).getAMResponse(); + response1 = amService.allocate(allocateRequest1); updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); NodeReport nr = updatedNodes.iterator().next(); @@ -126,7 +126,7 @@ public void testAMRMUnusableNodes() throws Exception { Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState()); // resending the allocate request returns the same result - response1 = amService.allocate(allocateRequest1).getAMResponse(); + response1 = amService.allocate(allocateRequest1); updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); nr = updatedNodes.iterator().next(); @@ -138,7 +138,7 @@ public void testAMRMUnusableNodes() throws Exception { // subsequent allocate request returns delta allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1 .getAppAttemptId(), response1.getResponseId(), 0F, null, null); - response1 = amService.allocate(allocateRequest1).getAMResponse(); + response1 = amService.allocate(allocateRequest1); updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); nr = updatedNodes.iterator().next(); @@ -158,7 +158,7 @@ public void testAMRMUnusableNodes() throws Exception { // allocate request returns no updated node AllocateRequest allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2 .getAppAttemptId(), 0, 0F, null, null); - AMResponse response2 = amService.allocate(allocateRequest2).getAMResponse(); + AllocateResponse response2 = amService.allocate(allocateRequest2); updatedNodes = response2.getUpdatedNodes(); Assert.assertEquals(0, updatedNodes.size()); @@ -167,7 +167,7 @@ public void testAMRMUnusableNodes() throws Exception { // both AM's should get delta updated nodes allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1 .getAppAttemptId(), response1.getResponseId(), 0F, null, null); - response1 = amService.allocate(allocateRequest1).getAMResponse(); + response1 = amService.allocate(allocateRequest1); updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); nr = updatedNodes.iterator().next(); @@ -176,7 +176,7 @@ public void testAMRMUnusableNodes() throws Exception { allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2 .getAppAttemptId(), response2.getResponseId(), 0F, null, null); - response2 = amService.allocate(allocateRequest2).getAMResponse(); + response2 = amService.allocate(allocateRequest2); updatedNodes = response2.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); nr = updatedNodes.iterator().next(); @@ -186,7 +186,7 @@ public void testAMRMUnusableNodes() throws Exception { // subsequent allocate calls should return no updated nodes allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2 .getAppAttemptId(), response2.getResponseId(), 0F, null, null); - response2 = amService.allocate(allocateRequest2).getAMResponse(); + response2 = amService.allocate(allocateRequest2); updatedNodes = response2.getUpdatedNodes(); Assert.assertEquals(0, updatedNodes.size()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java index 3bc5547342..e336505847 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java @@ -21,7 +21,7 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; @@ -81,22 +81,22 @@ public void testARRMResponseId() throws Exception { AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(attempt .getAppAttemptId(), 0, 0F, null, null); - AMResponse response = amService.allocate(allocateRequest).getAMResponse(); + AllocateResponse response = amService.allocate(allocateRequest); Assert.assertEquals(1, response.getResponseId()); Assert.assertFalse(response.getReboot()); allocateRequest = BuilderUtils.newAllocateRequest(attempt .getAppAttemptId(), response.getResponseId(), 0F, null, null); - response = amService.allocate(allocateRequest).getAMResponse(); + response = amService.allocate(allocateRequest); Assert.assertEquals(2, response.getResponseId()); /* try resending */ - response = amService.allocate(allocateRequest).getAMResponse(); + response = amService.allocate(allocateRequest); Assert.assertEquals(2, response.getResponseId()); /** try sending old request again **/ allocateRequest = BuilderUtils.newAllocateRequest(attempt .getAppAttemptId(), 0, 0F, null, null); - response = amService.allocate(allocateRequest).getAMResponse(); + response = amService.allocate(allocateRequest); Assert.assertTrue(response.getReboot()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java index e778af0e76..412ba3e54d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java @@ -201,8 +201,7 @@ public void testMasterKeyRollOver() throws Exception { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setApplicationAttemptId(applicationAttemptId); - Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse() - .getReboot()); + Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot()); // Simulate a master-key-roll-over ApplicationTokenSecretManager appTokenSecretManager = @@ -218,8 +217,7 @@ public void testMasterKeyRollOver() throws Exception { rmClient = createRMClient(rm, conf, rpc, currentUser); allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setApplicationAttemptId(applicationAttemptId); - Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse() - .getReboot()); + Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot()); } finally { rm.stop(); if (rmClient != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 80567bc671..144420e511 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -487,7 +487,7 @@ private Container requestAndGetContainer(AMRMProtocol scheduler, BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask, new ArrayList()); List allocatedContainers = scheduler.allocate(allocateRequest) - .getAMResponse().getAllocatedContainers(); + .getAllocatedContainers(); // Modify ask to request no more. allocateRequest.clearAsks(); @@ -499,7 +499,7 @@ private Container requestAndGetContainer(AMRMProtocol scheduler, Thread.sleep(1000); allocateRequest.setResponseId(allocateRequest.getResponseId() + 1); allocatedContainers = scheduler.allocate(allocateRequest) - .getAMResponse().getAllocatedContainers(); + .getAllocatedContainers(); } Assert.assertNotNull("Container is not allocted!", allocatedContainers); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm index 7b72322fa1..d081b50db4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm @@ -493,7 +493,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications +---+ * The AllocateResponse sent back from the ResourceManager provides the - following information via the AMResponse object: + following information: * Reboot flag: For scenarios when the ApplicationMaster may get out of sync with the ResourceManager. @@ -511,7 +511,9 @@ Hadoop MapReduce Next Generation - Writing YARN Applications allocated container, it will receive an update from the ResourceManager when the container completes. The ApplicationMaster can look into the status of the completed container and take appropriate actions such as - re-trying a particular sub-task in case of a failure. + re-trying a particular sub-task in case of a failure. + + * Number of cluster nodes: The number of hosts available on the cluster. [] @@ -525,13 +527,11 @@ Hadoop MapReduce Next Generation - Writing YARN Applications containers. +---+ - // Get AMResponse from AllocateResponse - AMResponse amResp = allocateResponse.getAMResponse(); // Retrieve list of allocated containers from the response // and on each allocated container, lets assume we are launching // the same job. - List allocatedContainers = amResp.getAllocatedContainers(); + List allocatedContainers = allocateResponse.getAllocatedContainers(); for (Container allocatedContainer : allocatedContainers) { LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() @@ -553,7 +553,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications } // Check what the current available resources in the cluster are - Resource availableResources = amResp.getAvailableResources(); + Resource availableResources = allocateResponse.getAvailableResources(); // Based on this information, an ApplicationMaster can make appropriate // decisions @@ -561,7 +561,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications // Let's assume we are keeping a count of total completed containers, // containers that failed and ones that completed successfully. List completedContainers = - amResp.getCompletedContainersStatuses(); + allocateResponse.getCompletedContainersStatuses(); for (ContainerStatus containerStatus : completedContainers) { LOG.info("Got container status for containerID= " + containerStatus.getContainerId() @@ -611,7 +611,7 @@ Hadoop MapReduce Next Generation - Writing YARN Applications +---+ - //Assuming an allocated Container obtained from AMResponse + //Assuming an allocated Container obtained from AllocateResponse Container container; // Connect to ContainerManager on the allocated container String cmIpPortStr = container.getNodeId().getHost() + ":"