YARN-6978. Add updateContainer API to NMClient. (Kartheek Muthyala via asuresh)

This commit is contained in:
Arun Suresh 2017-09-07 10:23:12 -07:00
parent 13eda50003
commit c41118a7f8
8 changed files with 227 additions and 33 deletions

View File

@ -1026,10 +1026,6 @@ public void onContainerStarted(ContainerId containerId,
} }
} }
@Override
public void onContainerResourceIncreased(
ContainerId containerId, Resource resource) {}
@Override @Override
public void onStartContainerError(ContainerId containerId, Throwable t) { public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to start Container " + containerId, t); LOG.error("Failed to start Container " + containerId, t);
@ -1050,10 +1046,25 @@ public void onStopContainerError(ContainerId containerId, Throwable t) {
containers.remove(containerId); containers.remove(containerId);
} }
@Deprecated
@Override @Override
public void onIncreaseContainerResourceError( public void onIncreaseContainerResourceError(
ContainerId containerId, Throwable t) {} ContainerId containerId, Throwable t) {}
@Deprecated
@Override
public void onContainerResourceIncreased(
ContainerId containerId, Resource resource) {}
@Override
public void onUpdateContainerResourceError(
ContainerId containerId, Throwable t) {
}
@Override
public void onContainerResourceUpdated(ContainerId containerId,
Resource resource) {
}
} }
/** /**

View File

@ -104,9 +104,27 @@ public abstract Map<String, ByteBuffer> startContainer(Container container,
* @throws YarnException YarnException. * @throws YarnException YarnException.
* @throws IOException IOException. * @throws IOException IOException.
*/ */
@Deprecated
public abstract void increaseContainerResource(Container container) public abstract void increaseContainerResource(Container container)
throws YarnException, IOException; throws YarnException, IOException;
/**
* <p>Update the resources of a container.</p>
*
* <p>The <code>ApplicationMaster</code> or other applications that use the
* client must provide the details of the container, including the Id and
* the target resource encapsulated in the updated container token via
* {@link Container}.
* </p>
*
* @param container the container with updated token.
*
* @throws YarnException YarnException.
* @throws IOException IOException.
*/
public abstract void updateContainerResource(Container container)
throws YarnException, IOException;
/** /**
* <p>Stop an started container.</p> * <p>Stop an started container.</p>
* *

View File

@ -177,8 +177,22 @@ protected NMClientAsync(String name, NMClient client,
public abstract void startContainerAsync( public abstract void startContainerAsync(
Container container, ContainerLaunchContext containerLaunchContext); Container container, ContainerLaunchContext containerLaunchContext);
@Deprecated
public abstract void increaseContainerResourceAsync(Container container); public abstract void increaseContainerResourceAsync(Container container);
/**
* <p>Update the resources of a container.</p>
*
* <p>The <code>ApplicationMaster</code> or other applications that use the
* client must provide the details of the container, including the Id and
* the target resource encapsulated in the updated container token via
* {@link Container}.
* </p>
*
* @param container the container with updated token.
*/
public abstract void updateContainerResourceAsync(Container container);
/** /**
* <p>Re-Initialize the Container.</p> * <p>Re-Initialize the Container.</p>
* *
@ -301,9 +315,20 @@ public abstract void onStartContainerError(
* @param containerId the Id of the container * @param containerId the Id of the container
* @param resource the target resource of the container * @param resource the target resource of the container
*/ */
@Deprecated
public abstract void onContainerResourceIncreased( public abstract void onContainerResourceIncreased(
ContainerId containerId, Resource resource); ContainerId containerId, Resource resource);
/**
* The API is called when <code>NodeManager</code> responds to indicate
* the container resource has been successfully updated.
*
* @param containerId the Id of the container
* @param resource the target resource of the container
*/
public abstract void onContainerResourceUpdated(
ContainerId containerId, Resource resource);
/** /**
* The API is called when an exception is raised in the process of * The API is called when an exception is raised in the process of
* querying the status of a container. * querying the status of a container.
@ -321,9 +346,20 @@ public abstract void onGetContainerStatusError(
* @param containerId the Id of the container * @param containerId the Id of the container
* @param t the raised exception * @param t the raised exception
*/ */
@Deprecated
public abstract void onIncreaseContainerResourceError( public abstract void onIncreaseContainerResourceError(
ContainerId containerId, Throwable t); ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* updating container resource.
*
* @param containerId the Id of the container
* @param t the raised exception
*/
public abstract void onUpdateContainerResourceError(
ContainerId containerId, Throwable t);
/** /**
* The API is called when an exception is raised in the process of * The API is called when an exception is raised in the process of
* stopping a container. * stopping a container.

View File

@ -259,6 +259,7 @@ public void startContainerAsync(
} }
} }
@Deprecated
public void increaseContainerResourceAsync(Container container) { public void increaseContainerResourceAsync(Container container) {
if (!(callbackHandler instanceof AbstractCallbackHandler)) { if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container resource " LOG.error("Callback handler does not implement container resource "
@ -274,7 +275,7 @@ public void increaseContainerResourceAsync(Container container) {
" is neither started nor scheduled to start")); " is neither started nor scheduled to start"));
} }
try { try {
events.put(new IncreaseContainerResourceEvent(container)); events.put(new UpdateContainerResourceEvent(container, true));
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of increasing resource of " LOG.warn("Exception when scheduling the event of increasing resource of "
+ "Container " + container.getId()); + "Container " + container.getId());
@ -282,6 +283,30 @@ public void increaseContainerResourceAsync(Container container) {
} }
} }
@Override
public void updateContainerResourceAsync(Container container) {
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container resource "
+ "increase callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(container.getId()) == null) {
handler.onUpdateContainerResourceError(
container.getId(),
RPCUtil.getRemoteException(
"Container " + container.getId() +
" is neither started nor scheduled to start"));
}
try {
events.put(new UpdateContainerResourceEvent(container, false));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of increasing resource of "
+ "Container " + container.getId());
handler.onUpdateContainerResourceError(container.getId(), e);
}
}
@Override @Override
public void reInitializeContainerAsync(ContainerId containerId, public void reInitializeContainerAsync(ContainerId containerId,
ContainerLaunchContext containerLaunchContex, boolean autoCommit){ ContainerLaunchContext containerLaunchContex, boolean autoCommit){
@ -427,7 +452,7 @@ protected enum ContainerEventType {
START_CONTAINER, START_CONTAINER,
STOP_CONTAINER, STOP_CONTAINER,
QUERY_CONTAINER, QUERY_CONTAINER,
INCREASE_CONTAINER_RESOURCE, UPDATE_CONTAINER_RESOURCE,
REINITIALIZE_CONTAINER, REINITIALIZE_CONTAINER,
RESTART_CONTAINER, RESTART_CONTAINER,
ROLLBACK_LAST_REINIT, ROLLBACK_LAST_REINIT,
@ -503,14 +528,20 @@ public boolean isAutoCommit() {
} }
} }
protected static class IncreaseContainerResourceEvent extends ContainerEvent { protected static class UpdateContainerResourceEvent extends ContainerEvent {
private Container container; private Container container;
private boolean isIncreaseEvent;
public IncreaseContainerResourceEvent(Container container) { // UpdateContainerResourceEvent constructor takes in a
// flag to support callback API's calling through the deprecated
// increaseContainerResource
public UpdateContainerResourceEvent(Container container,
boolean isIncreaseEvent) {
super(container.getId(), container.getNodeId(), super(container.getId(), container.getNodeId(),
container.getContainerToken(), container.getContainerToken(),
ContainerEventType.INCREASE_CONTAINER_RESOURCE); ContainerEventType.UPDATE_CONTAINER_RESOURCE);
this.container = container; this.container = container;
this.isIncreaseEvent = isIncreaseEvent;
} }
public Container getContainer() { public Container getContainer() {
@ -536,8 +567,8 @@ ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
// Transitions from RUNNING state // Transitions from RUNNING state
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING, .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.INCREASE_CONTAINER_RESOURCE, ContainerEventType.UPDATE_CONTAINER_RESOURCE,
new IncreaseContainerResourceTransition()) new UpdateContainerResourceTransition())
// Transitions for Container Upgrade // Transitions for Container Upgrade
.addTransition(ContainerState.RUNNING, .addTransition(ContainerState.RUNNING,
@ -566,7 +597,7 @@ ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
.addTransition(ContainerState.DONE, ContainerState.DONE, .addTransition(ContainerState.DONE, ContainerState.DONE,
EnumSet.of(ContainerEventType.START_CONTAINER, EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER, ContainerEventType.STOP_CONTAINER,
ContainerEventType.INCREASE_CONTAINER_RESOURCE)) ContainerEventType.UPDATE_CONTAINER_RESOURCE))
// Transition from FAILED state // Transition from FAILED state
.addTransition(ContainerState.FAILED, ContainerState.FAILED, .addTransition(ContainerState.FAILED, ContainerState.FAILED,
@ -576,7 +607,7 @@ ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
ContainerEventType.RESTART_CONTAINER, ContainerEventType.RESTART_CONTAINER,
ContainerEventType.COMMIT_LAST_REINT, ContainerEventType.COMMIT_LAST_REINT,
ContainerEventType.ROLLBACK_LAST_REINIT, ContainerEventType.ROLLBACK_LAST_REINIT,
ContainerEventType.INCREASE_CONTAINER_RESOURCE)); ContainerEventType.UPDATE_CONTAINER_RESOURCE));
protected static class StartContainerTransition implements protected static class StartContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent, MultipleArcTransition<StatefulContainer, ContainerEvent,
@ -628,46 +659,61 @@ private ContainerState onExceptionRaised(StatefulContainer container,
} }
} }
protected static class IncreaseContainerResourceTransition implements protected static class UpdateContainerResourceTransition implements
SingleArcTransition<StatefulContainer, ContainerEvent> { SingleArcTransition<StatefulContainer, ContainerEvent> {
@SuppressWarnings("deprecation")
@Override @Override
public void transition( public void transition(
StatefulContainer container, ContainerEvent event) { StatefulContainer container, ContainerEvent event) {
boolean isIncreaseEvent = false;
if (!(container.nmClientAsync.getCallbackHandler() if (!(container.nmClientAsync.getCallbackHandler()
instanceof AbstractCallbackHandler)) { instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container resource " LOG.error("Callback handler does not implement container resource "
+ "increase callback methods"); + "update callback methods");
return; return;
} }
AbstractCallbackHandler handler = AbstractCallbackHandler handler =
(AbstractCallbackHandler) container.nmClientAsync (AbstractCallbackHandler) container.nmClientAsync
.getCallbackHandler(); .getCallbackHandler();
try { try {
if (!(event instanceof IncreaseContainerResourceEvent)) { if (!(event instanceof UpdateContainerResourceEvent)) {
throw new AssertionError("Unexpected event type. Expecting:" throw new AssertionError("Unexpected event type. Expecting:"
+ "IncreaseContainerResourceEvent. Got:" + event); + "UpdateContainerResourceEvent. Got:" + event);
} }
IncreaseContainerResourceEvent increaseEvent = UpdateContainerResourceEvent updateEvent =
(IncreaseContainerResourceEvent) event; (UpdateContainerResourceEvent) event;
container.nmClientAsync.getClient().increaseContainerResource( container.nmClientAsync.getClient().updateContainerResource(
increaseEvent.getContainer()); updateEvent.getContainer());
isIncreaseEvent = updateEvent.isIncreaseEvent;
try { try {
handler.onContainerResourceIncreased( //If isIncreaseEvent is set, set the appropriate callbacks
increaseEvent.getContainerId(), increaseEvent.getContainer() //for backward compatibility
.getResource()); if (isIncreaseEvent) {
handler.onContainerResourceIncreased(updateEvent.getContainerId(),
updateEvent.getContainer().getResource());
} else {
handler.onContainerResourceUpdated(updateEvent.getContainerId(),
updateEvent.getContainer().getResource());
}
} catch (Throwable thr) { } catch (Throwable thr) {
// Don't process user created unchecked exception // Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from " LOG.info("Unchecked exception is thrown from "
+ "onContainerResourceIncreased for Container " + "onContainerResourceUpdated for Container "
+ event.getContainerId(), thr); + event.getContainerId(), thr);
} }
} catch (Exception e) { } catch (Exception e) {
try { try {
handler.onIncreaseContainerResourceError(event.getContainerId(), e); if (isIncreaseEvent) {
handler
.onIncreaseContainerResourceError(event.getContainerId(), e);
} else {
handler.onUpdateContainerResourceError(event.getContainerId(), e);
}
} catch (Throwable thr) { } catch (Throwable thr) {
// Don't process user created unchecked exception // Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from " LOG.info("Unchecked exception is thrown from "
+ "onIncreaseContainerResourceError for Container " + "onUpdateContainerResourceError for Container "
+ event.getContainerId(), thr); + event.getContainerId(), thr);
} }
} }

View File

@ -230,6 +230,7 @@ public Map<String, ByteBuffer> startContainer(
} }
} }
@Deprecated
@Override @Override
public void increaseContainerResource(Container container) public void increaseContainerResource(Container container)
throws YarnException, IOException { throws YarnException, IOException {
@ -258,6 +259,34 @@ public void increaseContainerResource(Container container)
} }
} }
@Override
public void updateContainerResource(Container container)
throws YarnException, IOException {
ContainerManagementProtocolProxyData proxy = null;
try {
proxy =
cmProxy.getProxy(container.getNodeId().toString(), container.getId());
List<Token> updateTokens = new ArrayList<>();
updateTokens.add(container.getContainerToken());
ContainerUpdateRequest request =
ContainerUpdateRequest.newInstance(updateTokens);
ContainerUpdateResponse response =
proxy.getContainerManagementProtocol().updateContainer(request);
if (response.getFailedRequests() != null && response.getFailedRequests()
.containsKey(container.getId())) {
Throwable t =
response.getFailedRequests().get(container.getId()).deSerialize();
parseAndThrowException(t);
}
} finally {
if (proxy != null) {
cmProxy.mayBeCloseProxy(proxy);
}
}
}
@Override @Override
public void stopContainer(ContainerId containerId, NodeId nodeId) public void stopContainer(ContainerId containerId, NodeId nodeId)
throws YarnException, IOException { throws YarnException, IOException {

View File

@ -253,7 +253,7 @@ public void onContainerStarted(ContainerId containerId,
int t = containerId.getId() % 5; int t = containerId.getId() % 5;
switch (t) { switch (t) {
case 0: case 0:
asyncClient.increaseContainerResourceAsync(container); asyncClient.updateContainerResourceAsync(container);
break; break;
case 1: case 1:
asyncClient.reInitializeContainerAsync(containerId, asyncClient.reInitializeContainerAsync(containerId,
@ -295,7 +295,7 @@ public void onContainerStatusReceived(ContainerId containerId,
// containerId // containerId
Container container = Container.newInstance( Container container = Container.newInstance(
containerId, nodeId, null, null, null, containerToken); containerId, nodeId, null, null, null, containerToken);
asyncClient.increaseContainerResourceAsync(container); asyncClient.updateContainerResourceAsync(container);
// Shouldn't crash the test thread // Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception"); throw new RuntimeException("Ignorable Exception");
@ -320,6 +320,25 @@ public void onContainerResourceIncreased(
throw new RuntimeException("Ignorable Exception"); throw new RuntimeException("Ignorable Exception");
} }
@SuppressWarnings("deprecation")
@Override
public void onContainerResourceUpdated(ContainerId containerId,
Resource resource) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerResourceUpdated");
return;
}
TestData td = testMap.get(OpsToTest.INCR);
td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.reInitializeContainerAsync(containerId,
Records.newRecord(ContainerLaunchContext.class), true);
// throw a fake user exception, and shouldn't crash the test
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public void onContainerReInitialize(ContainerId containerId) { public void onContainerReInitialize(ContainerId containerId) {
@ -450,6 +469,27 @@ public void onIncreaseContainerResourceError(
throw new RuntimeException("Ignorable Exception"); throw new RuntimeException("Ignorable Exception");
} }
@SuppressWarnings("deprecation")
@Override
public void onUpdateContainerResourceError(ContainerId containerId,
Throwable t) {
if (containerId.getId() < expectedSuccess + expectedFailure) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception onUpdatedContainerResourceError");
return;
}
TestData td = testMap.get(OpsToTest.INCR);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// increase container resource error should NOT change the
// the container status to FAILED
// move on to the following failure tests
asyncClient.stopContainerAsync(containerId, nodeId);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public void onContainerReInitializeError(ContainerId containerId, public void onContainerReInitializeError(ContainerId containerId,
@ -673,7 +713,7 @@ private NMClient mockNMClient(int mode)
when(client.getContainerStatus(any(ContainerId.class), when(client.getContainerStatus(any(ContainerId.class),
any(NodeId.class))).thenReturn( any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class)); recordFactory.newRecordInstance(ContainerStatus.class));
doNothing().when(client).increaseContainerResource( doNothing().when(client).updateContainerResource(
any(Container.class)); any(Container.class));
doNothing().when(client).reInitializeContainer( doNothing().when(client).reInitializeContainer(
any(ContainerId.class), any(ContainerLaunchContext.class), any(ContainerId.class), any(ContainerLaunchContext.class),
@ -703,7 +743,7 @@ private NMClient mockNMClient(int mode)
any(NodeId.class))).thenReturn( any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class)); recordFactory.newRecordInstance(ContainerStatus.class));
doThrow(RPCUtil.getRemoteException("Increase Resource Exception")) doThrow(RPCUtil.getRemoteException("Increase Resource Exception"))
.when(client).increaseContainerResource(any(Container.class)); .when(client).updateContainerResource(any(Container.class));
doThrow(RPCUtil.getRemoteException("ReInitialize Exception")) doThrow(RPCUtil.getRemoteException("ReInitialize Exception"))
.when(client).reInitializeContainer( .when(client).reInitializeContainer(
any(ContainerId.class), any(ContainerLaunchContext.class), any(ContainerId.class), any(ContainerLaunchContext.class),
@ -818,10 +858,16 @@ public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) { ContainerStatus containerStatus) {
} }
@Deprecated
@Override @Override
public void onContainerResourceIncreased( public void onContainerResourceIncreased(
ContainerId containerId, Resource resource) {} ContainerId containerId, Resource resource) {}
@Override
public void onContainerResourceUpdated(ContainerId containerId,
Resource resource) {
}
@Override @Override
public void onContainerStopped(ContainerId containerId) { public void onContainerStopped(ContainerId containerId) {
} }
@ -847,10 +893,16 @@ public void onGetContainerStatusError(ContainerId containerId,
Throwable t) { Throwable t) {
} }
@Deprecated
@Override @Override
public void onIncreaseContainerResourceError( public void onIncreaseContainerResourceError(
ContainerId containerId, Throwable t) {} ContainerId containerId, Throwable t) {}
@Override
public void onUpdateContainerResourceError(ContainerId containerId,
Throwable t) {
}
@Override @Override
public void onStopContainerError(ContainerId containerId, Throwable t) { public void onStopContainerError(ContainerId containerId, Throwable t) {
} }

View File

@ -1428,6 +1428,7 @@ public void testAMRMClientWithContainerDemotion()
amClient.ask.clear(); amClient.ask.clear();
} }
@SuppressWarnings("deprecation")
private void updateContainerExecType(AllocateResponse allocResponse, private void updateContainerExecType(AllocateResponse allocResponse,
ExecutionType expectedExecType, NMClientImpl nmClient) ExecutionType expectedExecType, NMClientImpl nmClient)
throws IOException, YarnException { throws IOException, YarnException {

View File

@ -301,10 +301,10 @@ private void testContainerManagement(NMClientImpl nmClient,
assertTrue("The thrown exception is not expected", assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager")); e.getMessage().contains("is not handled by this NodeManager"));
} }
// increaseContainerResource shouldn't be called before startContainer, // upadateContainerResource shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container // otherwise, NodeManager cannot find the container
try { try {
nmClient.increaseContainerResource(container); nmClient.updateContainerResource(container);
fail("Exception is expected"); fail("Exception is expected");
} catch (YarnException e) { } catch (YarnException e) {
assertTrue("The thrown exception is not expected", assertTrue("The thrown exception is not expected",
@ -469,6 +469,7 @@ private void testGetContainerStatus(Container container, int index,
} }
} }
@SuppressWarnings("deprecation")
private void testIncreaseContainerResource(Container container) private void testIncreaseContainerResource(Container container)
throws YarnException, IOException { throws YarnException, IOException {
try { try {