YARN-441. Removed unused utility methods for collections from two API records. Contributed by Xuan Gong.
MAPREDUCE-5163. Update MR App to not use API utility methods for collections after YARN-441. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1469657 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
980e6c54ba
commit
8e1c2823fc
@ -320,6 +320,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||
MAPREDUCE-4932. mapreduce.job#getTaskCompletionEvents incompatible with
|
||||
Hadoop 1. (rkanter via tucu)
|
||||
|
||||
MAPREDUCE-5163. Update MR App to not use API utility methods for collections
|
||||
after YARN-441. (Xuan Gong via vinodkv)
|
||||
|
||||
Release 2.0.4-alpha - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -157,8 +157,9 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
|
||||
startRequest.setContainer(event.getAllocatedContainer());
|
||||
StartContainerResponse response = proxy.startContainer(startRequest);
|
||||
|
||||
ByteBuffer portInfo = response
|
||||
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
|
||||
ByteBuffer portInfo =
|
||||
response.getAllServiceResponse().get(
|
||||
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
|
||||
int port = -1;
|
||||
if(portInfo != null) {
|
||||
port = ShuffleHandler.deserializeMetaData(portInfo);
|
||||
|
@ -26,7 +26,11 @@
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
@ -58,6 +62,7 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContainerLauncherImpl {
|
||||
@ -65,6 +70,15 @@ public class TestContainerLauncherImpl {
|
||||
private static final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
private Map<String, ByteBuffer> serviceResponse =
|
||||
new HashMap<String, ByteBuffer>();
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
serviceResponse.clear();
|
||||
serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||
ShuffleHandler.serializeMetaData(80));
|
||||
}
|
||||
|
||||
private static class ContainerLauncherImplUnderTest extends
|
||||
ContainerLauncherImpl {
|
||||
@ -145,8 +159,7 @@ public void testHandle() throws Exception {
|
||||
String cmAddress = "127.0.0.1:8000";
|
||||
StartContainerResponse startResp =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||
ShuffleHandler.serializeMetaData(80));
|
||||
startResp.setAllServiceResponse(serviceResponse);
|
||||
|
||||
|
||||
LOG.info("inserting launch event");
|
||||
@ -210,8 +223,7 @@ public void testOutOfOrder() throws Exception {
|
||||
String cmAddress = "127.0.0.1:8000";
|
||||
StartContainerResponse startResp =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||
ShuffleHandler.serializeMetaData(80));
|
||||
startResp.setAllServiceResponse(serviceResponse);
|
||||
|
||||
LOG.info("inserting cleanup event");
|
||||
ContainerLauncherEvent mockCleanupEvent =
|
||||
@ -275,8 +287,7 @@ public void testMyShutdown() throws Exception {
|
||||
String cmAddress = "127.0.0.1:8000";
|
||||
StartContainerResponse startResp =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||
ShuffleHandler.serializeMetaData(80));
|
||||
startResp.setAllServiceResponse(serviceResponse);
|
||||
|
||||
LOG.info("inserting launch event");
|
||||
ContainerRemoteLaunchEvent mockLaunchEvent =
|
||||
@ -333,8 +344,7 @@ public void testContainerCleaned() throws Exception {
|
||||
String cmAddress = "127.0.0.1:8000";
|
||||
StartContainerResponse startResp =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||
ShuffleHandler.serializeMetaData(80));
|
||||
startResp.setAllServiceResponse(serviceResponse);
|
||||
|
||||
|
||||
LOG.info("inserting launch event");
|
||||
|
@ -85,6 +85,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||
YARN-444. Moved special container exit codes from YarnConfiguration to API
|
||||
where they belong. (Sandy Ryza via vinodkv)
|
||||
|
||||
YARN-441. Removed unused utility methods for collections from two API
|
||||
records. (Xuan Gong via vinodkv)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
||||
|
@ -20,10 +20,8 @@
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
@ -120,36 +118,16 @@ public interface AllocateRequest {
|
||||
@Stable
|
||||
List<ResourceRequest> getAskList();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
ResourceRequest getAsk(int index);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
int getAskCount();
|
||||
|
||||
/**
|
||||
* Add list of <code>ResourceRequest</code> to update the
|
||||
* Set list of <code>ResourceRequest</code> to update the
|
||||
* <code>ResourceManager</code> about the application's resource requirements.
|
||||
* @param resourceRequest list of <code>ResourceRequest</code> to update the
|
||||
* @param resourceRequests list of <code>ResourceRequest</code> to update the
|
||||
* <code>ResourceManager</code> about the application's
|
||||
* resource requirements
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
void addAllAsks(List<ResourceRequest> resourceRequest);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void addAsk(ResourceRequest request);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void removeAsk(int index);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void clearAsks();
|
||||
void setAskList(List<ResourceRequest> resourceRequests);
|
||||
|
||||
/**
|
||||
* Get the list of <code>ContainerId</code> of containers being
|
||||
@ -160,17 +138,9 @@ public interface AllocateRequest {
|
||||
@Public
|
||||
@Stable
|
||||
List<ContainerId> getReleaseList();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
ContainerId getRelease(int index);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
int getReleaseCount();
|
||||
|
||||
/**
|
||||
* Add the list of <code>ContainerId</code> of containers being
|
||||
* Set the list of <code>ContainerId</code> of containers being
|
||||
* released by the <code>ApplicationMaster</code>
|
||||
* @param releaseContainers list of <code>ContainerId</code> of
|
||||
* containers being released by the <
|
||||
@ -178,17 +148,5 @@ public interface AllocateRequest {
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
void addAllReleases(List<ContainerId> releaseContainers);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void addRelease(ContainerId container);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void removeRelease(int index);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void clearReleases();
|
||||
void setReleaseList(List<ContainerId> releaseContainers);
|
||||
}
|
||||
|
@ -45,43 +45,11 @@ public interface StartContainerResponse {
|
||||
Map<String, ByteBuffer> getAllServiceResponse();
|
||||
|
||||
/**
|
||||
* Get the response from a single auxiliary service running on the
|
||||
* <code>NodeManager</code>
|
||||
*
|
||||
* @param key The auxiliary service name whose response is desired.
|
||||
* @return The opaque blob <code>ByteBuffer</code> returned by the auxiliary
|
||||
* service.
|
||||
*/
|
||||
ByteBuffer getServiceResponse(String key);
|
||||
|
||||
/**
|
||||
* Add to the list of auxiliary services which have been started on the
|
||||
* Set to the list of auxiliary services which have been started on the
|
||||
* <code>NodeManager</code>. This is done only once when the
|
||||
* <code>NodeManager</code> starts up
|
||||
* @param serviceResponse A map from auxiliary service names to the opaque
|
||||
* @param serviceResponses A map from auxiliary service names to the opaque
|
||||
* blob <code>ByteBuffer</code>s for that auxiliary service
|
||||
*/
|
||||
void addAllServiceResponse(Map<String, ByteBuffer> serviceResponse);
|
||||
|
||||
/**
|
||||
* Add to the list of auxiliary services which have been started on the
|
||||
* <code>NodeManager</code>. This is done only once when the
|
||||
* <code>NodeManager</code> starts up
|
||||
*
|
||||
* @param key The auxiliary service name
|
||||
* @param value The opaque blob <code>ByteBuffer</code> managed by the
|
||||
* auxiliary service
|
||||
*/
|
||||
void setServiceResponse(String key, ByteBuffer value);
|
||||
|
||||
/**
|
||||
* Remove a single auxiliary service from the StartContainerResponse object
|
||||
* @param key The auxiliary service to remove
|
||||
*/
|
||||
void removeServiceResponse(String key);
|
||||
|
||||
/**
|
||||
* Remove all the auxiliary services from the StartContainerResponse object
|
||||
*/
|
||||
void clearServiceResponse();
|
||||
void setAllServiceResponse(Map<String, ByteBuffer> serviceResponses);
|
||||
}
|
||||
|
@ -25,7 +25,6 @@
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
@ -144,14 +143,13 @@ public List<ResourceRequest> getAskList() {
|
||||
return this.ask;
|
||||
}
|
||||
@Override
|
||||
public ResourceRequest getAsk(int index) {
|
||||
public void setAskList(final List<ResourceRequest> resourceRequests) {
|
||||
if(resourceRequests == null) {
|
||||
return;
|
||||
}
|
||||
initAsks();
|
||||
return this.ask.get(index);
|
||||
}
|
||||
@Override
|
||||
public int getAskCount() {
|
||||
initAsks();
|
||||
return this.ask.size();
|
||||
this.ask.clear();
|
||||
this.ask.addAll(resourceRequests);
|
||||
}
|
||||
|
||||
private void initAsks() {
|
||||
@ -167,14 +165,6 @@ private void initAsks() {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAllAsks(final List<ResourceRequest> ask) {
|
||||
if (ask == null)
|
||||
return;
|
||||
initAsks();
|
||||
this.ask.addAll(ask);
|
||||
}
|
||||
|
||||
private void addAsksToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearAsk();
|
||||
@ -209,34 +199,18 @@ public void remove() {
|
||||
builder.addAllAsk(iterable);
|
||||
}
|
||||
@Override
|
||||
public void addAsk(ResourceRequest ask) {
|
||||
initAsks();
|
||||
this.ask.add(ask);
|
||||
}
|
||||
@Override
|
||||
public void removeAsk(int index) {
|
||||
initAsks();
|
||||
this.ask.remove(index);
|
||||
}
|
||||
@Override
|
||||
public void clearAsks() {
|
||||
initAsks();
|
||||
this.ask.clear();
|
||||
}
|
||||
@Override
|
||||
public List<ContainerId> getReleaseList() {
|
||||
initReleases();
|
||||
return this.release;
|
||||
}
|
||||
@Override
|
||||
public ContainerId getRelease(int index) {
|
||||
public void setReleaseList(List<ContainerId> releaseContainers) {
|
||||
if(releaseContainers == null) {
|
||||
return;
|
||||
}
|
||||
initReleases();
|
||||
return this.release.get(index);
|
||||
}
|
||||
@Override
|
||||
public int getReleaseCount() {
|
||||
initReleases();
|
||||
return this.release.size();
|
||||
this.release.clear();
|
||||
this.release.addAll(releaseContainers);
|
||||
}
|
||||
|
||||
private void initReleases() {
|
||||
@ -252,14 +226,6 @@ private void initReleases() {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAllReleases(final List<ContainerId> release) {
|
||||
if (release == null)
|
||||
return;
|
||||
initReleases();
|
||||
this.release.addAll(release);
|
||||
}
|
||||
|
||||
private void addReleasesToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearRelease();
|
||||
@ -293,21 +259,6 @@ public void remove() {
|
||||
};
|
||||
builder.addAllRelease(iterable);
|
||||
}
|
||||
@Override
|
||||
public void addRelease(ContainerId release) {
|
||||
initReleases();
|
||||
this.release.add(release);
|
||||
}
|
||||
@Override
|
||||
public void removeRelease(int index) {
|
||||
initReleases();
|
||||
this.release.remove(index);
|
||||
}
|
||||
@Override
|
||||
public void clearReleases() {
|
||||
initReleases();
|
||||
this.release.clear();
|
||||
}
|
||||
|
||||
private ApplicationAttemptIdPBImpl convertFromProtoFormat(ApplicationAttemptIdProto p) {
|
||||
return new ApplicationAttemptIdPBImpl(p);
|
||||
|
@ -84,9 +84,14 @@ public synchronized Map<String, ByteBuffer> getAllServiceResponse() {
|
||||
return this.serviceResponse;
|
||||
}
|
||||
@Override
|
||||
public synchronized ByteBuffer getServiceResponse(String key) {
|
||||
public synchronized void setAllServiceResponse(
|
||||
Map<String, ByteBuffer> serviceResponses) {
|
||||
if(serviceResponses == null) {
|
||||
return;
|
||||
}
|
||||
initServiceResponse();
|
||||
return this.serviceResponse.get(key);
|
||||
this.serviceResponse.clear();
|
||||
this.serviceResponse.putAll(serviceResponses);
|
||||
}
|
||||
|
||||
private synchronized void initServiceResponse() {
|
||||
@ -102,14 +107,6 @@ private synchronized void initServiceResponse() {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void addAllServiceResponse(final Map<String, ByteBuffer> serviceResponse) {
|
||||
if (serviceResponse == null)
|
||||
return;
|
||||
initServiceResponse();
|
||||
this.serviceResponse.putAll(serviceResponse);
|
||||
}
|
||||
|
||||
private synchronized void addServiceResponseToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearServiceResponse();
|
||||
@ -143,19 +140,4 @@ public synchronized boolean hasNext() {
|
||||
};
|
||||
builder.addAllServiceResponse(iterable);
|
||||
}
|
||||
@Override
|
||||
public synchronized void setServiceResponse(String key, ByteBuffer val) {
|
||||
initServiceResponse();
|
||||
this.serviceResponse.put(key, val);
|
||||
}
|
||||
@Override
|
||||
public synchronized void removeServiceResponse(String key) {
|
||||
initServiceResponse();
|
||||
this.serviceResponse.remove(key);
|
||||
}
|
||||
@Override
|
||||
public synchronized void clearServiceResponse() {
|
||||
initServiceResponse();
|
||||
this.serviceResponse.clear();
|
||||
}
|
||||
}
|
||||
|
@ -393,8 +393,8 @@ public static AllocateRequest newAllocateRequest(
|
||||
allocateRequest.setApplicationAttemptId(applicationAttemptId);
|
||||
allocateRequest.setResponseId(responseID);
|
||||
allocateRequest.setProgress(appProgress);
|
||||
allocateRequest.addAllAsks(resourceAsk);
|
||||
allocateRequest.addAllReleases(containersToBeReleased);
|
||||
allocateRequest.setAskList(resourceAsk);
|
||||
allocateRequest.setReleaseList(containersToBeReleased);
|
||||
return allocateRequest;
|
||||
}
|
||||
|
||||
|
@ -468,7 +468,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
|
||||
StartContainerResponse response =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
response.addAllServiceResponse(auxiliaryServices.getMeta());
|
||||
response.setAllServiceResponse(auxiliaryServices.getMeta());
|
||||
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
|
||||
// launch. A finished Application will not launch containers.
|
||||
metrics.launchedContainer();
|
||||
|
@ -497,7 +497,7 @@ private Container requestAndGetContainer(AMRMProtocol scheduler,
|
||||
.getAllocatedContainers();
|
||||
|
||||
// Modify ask to request no more.
|
||||
allocateRequest.clearAsks();
|
||||
allocateRequest.setAskList(new ArrayList<ResourceRequest>());
|
||||
|
||||
int waitCounter = 0;
|
||||
while ((allocatedContainers == null || allocatedContainers.size() == 0)
|
||||
|
Loading…
Reference in New Issue
Block a user