handler) {
-
- executors.putIfAbsent(event, new HashMap<>());
- executors.get(event).putIfAbsent(executor, new ArrayList<>());
-
- executors.get(event)
- .get(executor)
- .add(handler);
+ this.addHandler(event, handler, generateHandlerName(handler));
}
/**
- * Creates one executor with multiple event handlers.
+ * Add new handler to the event queue.
+ *
+ * By default a separated single thread executor will be dedicated to
+ * deliver the events to the registered event handler.
+ *
+ * @param event Triggering event.
+ * @param handler Handler of event (will be called from a separated
+ * thread)
+ * @param handlerName The name of handler (should be unique together with
+ * the event name)
+ * @param The type of the event payload.
+ * @param The type of the event identifier.
*/
- public void addHandlerGroup(String name, HandlerForEvent>...
- eventsAndHandlers) {
- SingleThreadExecutor sharedExecutor =
- new SingleThreadExecutor(name);
- for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
- addHandler(handlerForEvent.event, sharedExecutor,
- handlerForEvent.handler);
- }
+ public > void addHandler(
+ EVENT_TYPE event, EventHandler handler, String handlerName) {
+ validateEvent(event);
+ Preconditions.checkNotNull(handler, "Handler name should not be null.");
+ String executorName =
+ StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
+ + handlerName;
+ this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
+ }
+
+ private > void validateEvent(EVENT_TYPE event) {
+ Preconditions
+ .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
+ "Event name should not contain " + EXECUTOR_NAME_SEPARATOR
+ + " string.");
}
+ private String generateHandlerName(EventHandler handler) {
+ if (!"".equals(handler.getClass().getSimpleName())) {
+ return handler.getClass().getSimpleName();
+ } else {
+ return handler.getClass().getName();
+ }
+ }
+
+ /**
+ * Add event handler with custom executor.
+ *
+ * @param event Triggering event.
+ * @param executor The executor imlementation to deliver events from a
+ * separated threads. Please keep in your mind that
+ * registering metrics is the responsibility of the
+ * caller.
+ * @param handler Handler of event (will be called from a separated
+ * thread)
+ * @param The type of the event payload.
+ * @param The type of the event identifier.
+ */
+ public > void addHandler(
+ EVENT_TYPE event, EventExecutor executor,
+ EventHandler handler) {
+ validateEvent(event);
+ executors.putIfAbsent(event, new HashMap<>());
+ executors.get(event).putIfAbsent(executor, new ArrayList<>());
+
+ executors.get(event).get(executor).add(handler);
+ }
+
+
+
/**
* Route an event with payload to the right listener(s).
*
@@ -183,31 +225,5 @@ public void close() {
});
}
- /**
- * Event identifier together with the handler.
- *
- * @param
- */
- public static class HandlerForEvent {
-
- private final Event event;
-
- private final EventHandler handler;
-
- public HandlerForEvent(
- Event event,
- EventHandler handler) {
- this.event = event;
- this.handler = handler;
- }
-
- public Event getEvent() {
- return event;
- }
-
- public EventHandler getHandler() {
- return handler;
- }
- }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
index 19fddde9b4..8c5605a29b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
@@ -26,12 +26,17 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
import org.apache.hadoop.ozone.lease.LeaseExpiredException;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.map.HashedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,18 +63,39 @@ public abstract class EventWatcher leaseManager;
+ private final EventWatcherMetrics metrics;
+
+ private final String name;
+
protected final Map trackedEventsByUUID =
new ConcurrentHashMap<>();
protected final Set trackedEvents = new HashSet<>();
- public EventWatcher(Event startEvent,
+ private final Map startTrackingTimes = new HashedMap();
+
+ public EventWatcher(String name, Event startEvent,
Event completionEvent,
LeaseManager leaseManager) {
this.startEvent = startEvent;
this.completionEvent = completionEvent;
this.leaseManager = leaseManager;
+ this.metrics = new EventWatcherMetrics();
+ Preconditions.checkNotNull(name);
+ if (name.equals("")) {
+ name = getClass().getSimpleName();
+ }
+ if (name.equals("")) {
+ //for anonymous inner classes
+ name = getClass().getName();
+ }
+ this.name = name;
+ }
+ public EventWatcher(Event startEvent,
+ Event completionEvent,
+ LeaseManager leaseManager) {
+ this("", startEvent, completionEvent, leaseManager);
}
public void start(EventQueue queue) {
@@ -87,11 +113,16 @@ public void start(EventQueue queue) {
}
});
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.register(name, "EventWatcher metrics", metrics);
}
private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
EventPublisher publisher) {
+ metrics.incrementTrackedEvents();
UUID identifier = payload.getUUID();
+ startTrackingTimes.put(identifier, System.currentTimeMillis());
+
trackedEventsByUUID.put(identifier, payload);
trackedEvents.add(payload);
try {
@@ -112,16 +143,21 @@ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
private synchronized void handleCompletion(UUID uuid,
EventPublisher publisher) throws LeaseNotFoundException {
+ metrics.incrementCompletedEvents();
leaseManager.release(uuid);
TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid);
trackedEvents.remove(payload);
+ long originalTime = startTrackingTimes.remove(uuid);
+ metrics.updateFinishingTime(System.currentTimeMillis() - originalTime);
onFinished(publisher, payload);
}
private synchronized void handleTimeout(EventPublisher publisher,
UUID identifier) {
+ metrics.incrementTimedOutEvents();
TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier);
trackedEvents.remove(payload);
+ startTrackingTimes.remove(payload.getUUID());
onTimeout(publisher, payload);
}
@@ -154,4 +190,9 @@ public List getTimeoutEvents(
return trackedEventsByUUID.values().stream().filter(predicate)
.collect(Collectors.toList());
}
+
+ @VisibleForTesting
+ protected EventWatcherMetrics getMetrics() {
+ return metrics;
+ }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java
new file mode 100644
index 0000000000..1db81a9889
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Metrics for any event watcher.
+ */
+public class EventWatcherMetrics {
+
+ @Metric()
+ private MutableCounterLong trackedEvents;
+
+ @Metric()
+ private MutableCounterLong timedOutEvents;
+
+ @Metric()
+ private MutableCounterLong completedEvents;
+
+ @Metric()
+ private MutableRate completionTime;
+
+ public void incrementTrackedEvents() {
+ trackedEvents.incr();
+ }
+
+ public void incrementTimedOutEvents() {
+ timedOutEvents.incr();
+ }
+
+ public void incrementCompletedEvents() {
+ completedEvents.incr();
+ }
+
+ @VisibleForTesting
+ public void updateFinishingTime(long duration) {
+ completionTime.add(duration);
+ }
+
+ @VisibleForTesting
+ public MutableCounterLong getTrackedEvents() {
+ return trackedEvents;
+ }
+
+ @VisibleForTesting
+ public MutableCounterLong getTimedOutEvents() {
+ return timedOutEvents;
+ }
+
+ @VisibleForTesting
+ public MutableCounterLong getCompletedEvents() {
+ return completedEvents;
+ }
+
+ @VisibleForTesting
+ public MutableRate getCompletionTime() {
+ return completionTime;
+ }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
index a64e3d761d..3253f2d5db 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -23,13 +23,18 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* Simple EventExecutor to call all the event handler one-by-one.
*
* @param
*/
+@Metrics(context = "EventQueue")
public class SingleThreadExecutor implements EventExecutor {
public static final String THREAD_NAME_PREFIX = "EventQueue";
@@ -41,14 +46,24 @@ public class SingleThreadExecutor implements EventExecutor {
private final ThreadPoolExecutor executor;
- private final AtomicLong queuedCount = new AtomicLong(0);
+ @Metric
+ private MutableCounterLong queued;
- private final AtomicLong successfulCount = new AtomicLong(0);
+ @Metric
+ private MutableCounterLong done;
- private final AtomicLong failedCount = new AtomicLong(0);
+ @Metric
+ private MutableCounterLong failed;
+ /**
+ * Create SingleThreadExecutor.
+ *
+ * @param name Unique name used in monitoring and metrics.
+ */
public SingleThreadExecutor(String name) {
this.name = name;
+ DefaultMetricsSystem.instance()
+ .register("EventQueue" + name, "Event Executor metrics ", this);
LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>();
executor =
@@ -64,31 +79,31 @@ public SingleThreadExecutor(String name) {
@Override
public void onMessage(EventHandler handler, T message, EventPublisher
publisher) {
- queuedCount.incrementAndGet();
+ queued.incr();
executor.execute(() -> {
try {
handler.onMessage(message, publisher);
- successfulCount.incrementAndGet();
+ done.incr();
} catch (Exception ex) {
LOG.error("Error on execution message {}", message, ex);
- failedCount.incrementAndGet();
+ failed.incr();
}
});
}
@Override
public long failedEvents() {
- return failedCount.get();
+ return failed.value();
}
@Override
public long successfulEvents() {
- return successfulCount.get();
+ return done.value();
}
@Override
public long queuedEvents() {
- return queuedCount.get();
+ return queued.value();
}
@Override
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
index 39444097fe..2bdf705cfe 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -25,6 +25,8 @@
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
/**
* Testing the basic functionality of the event queue.
*/
@@ -44,11 +46,13 @@ public class TestEventQueue {
@Before
public void startEventQueue() {
+ DefaultMetricsSystem.initialize(getClass().getSimpleName());
queue = new EventQueue();
}
@After
public void stopEventQueue() {
+ DefaultMetricsSystem.shutdown();
queue.close();
}
@@ -79,35 +83,4 @@ public void multipleSubscriber() {
}
- @Test
- public void handlerGroup() {
- final long[] result = new long[2];
- queue.addHandlerGroup(
- "group",
- new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
- result[0] = payload),
- new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
- result[1] = payload)
- );
-
- queue.fireEvent(EVENT3, 23L);
- queue.fireEvent(EVENT4, 42L);
-
- queue.processAll(1000);
-
- Assert.assertEquals(23, result[0]);
- Assert.assertEquals(42, result[1]);
-
- Set eventQueueThreadNames =
- Thread.getAllStackTraces().keySet()
- .stream()
- .filter(t -> t.getName().startsWith(SingleThreadExecutor
- .THREAD_NAME_PREFIX))
- .map(Thread::getName)
- .collect(Collectors.toSet());
- System.out.println(eventQueueThreadNames);
- Assert.assertEquals(1, eventQueueThreadNames.size());
-
- }
-
}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
index 1731350cfe..38e15547a8 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
@@ -21,8 +21,13 @@
import java.util.Objects;
import java.util.UUID;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.test.MetricsAsserts;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -46,6 +51,7 @@ public class TestEventWatcher {
@Before
public void startLeaseManager() {
+ DefaultMetricsSystem.instance();
leaseManager = new LeaseManager<>(2000l);
leaseManager.start();
}
@@ -53,12 +59,12 @@ public void startLeaseManager() {
@After
public void stopLeaseManager() {
leaseManager.shutdown();
+ DefaultMetricsSystem.shutdown();
}
@Test
public void testEventHandling() throws InterruptedException {
-
EventQueue queue = new EventQueue();
EventWatcher
@@ -139,26 +145,101 @@ public void testInprogressFilter() throws InterruptedException {
Assert.assertEquals(0, c1todo.size());
Assert.assertFalse(replicationWatcher.contains(event1));
+ }
+ @Test
+ public void testMetrics() throws InterruptedException {
+
+ DefaultMetricsSystem.initialize("test");
+
+ EventQueue queue = new EventQueue();
+
+ EventWatcher
+ replicationWatcher = createEventWatcher();
+
+ EventHandlerStub underReplicatedEvents =
+ new EventHandlerStub<>();
+
+ queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
+
+ replicationWatcher.start(queue);
+
+ //send 3 event to track 3 in-progress activity
+ UnderreplicatedEvent event1 =
+ new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+ UnderreplicatedEvent event2 =
+ new UnderreplicatedEvent(UUID.randomUUID(), "C2");
+
+ UnderreplicatedEvent event3 =
+ new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+ queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
+
+ queue.fireEvent(WATCH_UNDER_REPLICATED, event2);
+
+ queue.fireEvent(WATCH_UNDER_REPLICATED, event3);
+
+ //1st event is completed, don't need to track any more
+ ReplicationCompletedEvent event1Completed =
+ new ReplicationCompletedEvent(event1.UUID, "C1", "D1");
+
+ queue.fireEvent(REPLICATION_COMPLETED, event1Completed);
+
+
+ Thread.sleep(2200l);
+
+ //until now: 3 in-progress activities are tracked with three
+ // UnderreplicatedEvents. The first one is completed, the remaining two
+ // are timed out (as the timeout -- defined in the leasmanager -- is 2000ms.
+
+ EventWatcherMetrics metrics = replicationWatcher.getMetrics();
+
+ //3 events are received
+ Assert.assertEquals(3, metrics.getTrackedEvents().value());
+
+ //one is finished. doesn't need to be resent
+ Assert.assertEquals(1, metrics.getCompletedEvents().value());
+
+ //Other two are timed out and resent
+ Assert.assertEquals(2, metrics.getTimedOutEvents().value());
+
+ DefaultMetricsSystem.shutdown();
}
private EventWatcher
createEventWatcher() {
- return new EventWatcher(
- WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) {
-
- @Override
- void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
- publisher.fireEvent(UNDER_REPLICATED, payload);
- }
-
- @Override
- void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
- //Good job. We did it.
- }
- };
+ return new CommandWatcherExample(WATCH_UNDER_REPLICATED,
+ REPLICATION_COMPLETED, leaseManager);
}
+ private class CommandWatcherExample
+ extends EventWatcher {
+
+ public CommandWatcherExample(Event startEvent,
+ Event completionEvent,
+ LeaseManager leaseManager) {
+ super("TestCommandWatcher", startEvent, completionEvent, leaseManager);
+ }
+
+ @Override
+ void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+ publisher.fireEvent(UNDER_REPLICATED, payload);
+ }
+
+ @Override
+ void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+ //Good job. We did it.
+ }
+
+ @Override
+ public EventWatcherMetrics getMetrics() {
+ return super.getMetrics();
+ }
+ }
+
+ ;
+
private static class ReplicationCompletedEvent
implements IdentifiableEventPayload {
@@ -217,4 +298,4 @@ public UUID getUUID() {
}
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
index 387e399bc1..5d4ab4a681 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
@@ -36,19 +36,9 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,87 +57,6 @@ public final class AMRMClientUtils {
private AMRMClientUtils() {
}
- /**
- * Handle ApplicationNotRegistered exception and re-register.
- *
- * @param appId application Id
- * @param rmProxy RM proxy instance
- * @param registerRequest the AM re-register request
- * @throws YarnException if re-register fails
- */
- public static void handleNotRegisteredExceptionAndReRegister(
- ApplicationId appId, ApplicationMasterProtocol rmProxy,
- RegisterApplicationMasterRequest registerRequest) throws YarnException {
- LOG.info("App attempt {} not registered, most likely due to RM failover. "
- + " Trying to re-register.", appId);
- try {
- rmProxy.registerApplicationMaster(registerRequest);
- } catch (Exception e) {
- if (e instanceof InvalidApplicationMasterRequestException
- && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
- LOG.info("Concurrent thread successfully registered, moving on.");
- } else {
- LOG.error("Error trying to re-register AM", e);
- throw new YarnException(e);
- }
- }
- }
-
- /**
- * Helper method for client calling ApplicationMasterProtocol.allocate that
- * handles re-register if RM fails over.
- *
- * @param request allocate request
- * @param rmProxy RM proxy
- * @param registerRequest the register request for re-register
- * @param appId application id
- * @return allocate response
- * @throws YarnException if RM call fails
- * @throws IOException if RM call fails
- */
- public static AllocateResponse allocateWithReRegister(AllocateRequest request,
- ApplicationMasterProtocol rmProxy,
- RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
- throws YarnException, IOException {
- try {
- return rmProxy.allocate(request);
- } catch (ApplicationMasterNotRegisteredException e) {
- handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
- registerRequest);
- // reset responseId after re-register
- request.setResponseId(0);
- // retry allocate
- return allocateWithReRegister(request, rmProxy, registerRequest, appId);
- }
- }
-
- /**
- * Helper method for client calling
- * ApplicationMasterProtocol.finishApplicationMaster that handles re-register
- * if RM fails over.
- *
- * @param request finishApplicationMaster request
- * @param rmProxy RM proxy
- * @param registerRequest the register request for re-register
- * @param appId application id
- * @return finishApplicationMaster response
- * @throws YarnException if RM call fails
- * @throws IOException if RM call fails
- */
- public static FinishApplicationMasterResponse finishAMWithReRegister(
- FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
- RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
- throws YarnException, IOException {
- try {
- return rmProxy.finishApplicationMaster(request);
- } catch (ApplicationMasterNotRegisteredException ex) {
- handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
- registerRequest);
- // retry finishAM after re-register
- return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
- }
- }
-
/**
* Create a proxy for the specified protocol.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
index e8a7f64a14..0d1a27e289 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
@@ -147,6 +147,11 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
+ public void setAMRegistrationRequest(
+ RegisterApplicationMasterRequest registerRequest) {
+ this.amRegistrationRequest = registerRequest;
+ }
+
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
@@ -259,8 +264,10 @@ public AllocateResponse allocate(AllocateRequest allocateRequest)
}
}
- // re register with RM, then retry allocate recursively
+ // re-register with RM, then retry allocate recursively
registerApplicationMaster(this.amRegistrationRequest);
+ // Reset responseId after re-register
+ allocateRequest.setResponseId(0);
return allocate(allocateRequest);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 02eef29dff..5f9d81b881 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -385,4 +386,19 @@ public boolean hasUAMId(String uamId) {
return this.unmanagedAppMasterMap.containsKey(uamId);
}
+ /**
+ * Return the rmProxy relayer of an UAM.
+ *
+ * @param uamId uam Id
+ * @return the rmProxy relayer
+ * @throws YarnException if fails
+ */
+ public AMRMClientRelayer getAMRMClientRelayer(String uamId)
+ throws YarnException {
+ if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " does not exist");
+ }
+ return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 73795dcbc7..856a8182dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
@@ -90,7 +91,7 @@ public class UnmanagedApplicationManager {
private BlockingQueue requestQueue;
private AMRequestHandlerThread handlerThread;
- private ApplicationMasterProtocol rmProxy;
+ private AMRMClientRelayer rmProxyRelayer;
private ApplicationId applicationId;
private String submitter;
private String appNameSuffix;
@@ -138,7 +139,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
this.appNameSuffix = appNameSuffix;
this.handlerThread = new AMRequestHandlerThread();
this.requestQueue = new LinkedBlockingQueue<>();
- this.rmProxy = null;
+ this.rmProxyRelayer = null;
this.connectionInitiated = false;
this.registerRequest = null;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
@@ -190,8 +191,9 @@ protected void createUAMProxy(Token amrmToken)
throws IOException {
this.userUgi = UserGroupInformation.createProxyUser(
this.applicationId.toString(), UserGroupInformation.getCurrentUser());
- this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
- this.userUgi, amrmToken);
+ this.rmProxyRelayer =
+ new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
+ this.conf, this.userUgi, amrmToken));
}
/**
@@ -209,19 +211,18 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
// Save the register request for re-register later
this.registerRequest = request;
- // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
- // We do not expect application already registered exception here
LOG.info("Registering the Unmanaged application master {}",
this.applicationId);
RegisterApplicationMasterResponse response =
- this.rmProxy.registerApplicationMaster(this.registerRequest);
+ this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
+ this.lastResponseId = 0;
for (Container container : response.getContainersFromPreviousAttempts()) {
- LOG.info("RegisterUAM returned existing running container "
+ LOG.debug("RegisterUAM returned existing running container "
+ container.getId());
}
for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
- LOG.info("RegisterUAM returned existing NM token for node "
+ LOG.debug("RegisterUAM returned existing NM token for node "
+ nmToken.getNodeId());
}
@@ -249,7 +250,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
this.handlerThread.shutdown();
- if (this.rmProxy == null) {
+ if (this.rmProxyRelayer == null) {
if (this.connectionInitiated) {
// This is possible if the async launchUAM is still
// blocked and retrying. Return a dummy response in this case.
@@ -261,8 +262,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
+ "be called before createAndRegister");
}
}
- return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
- this.registerRequest, this.applicationId);
+ return this.rmProxyRelayer.finishApplicationMaster(request);
}
/**
@@ -308,7 +308,7 @@ public void allocateAsync(AllocateRequest request,
//
// In case 2, we have already save the allocate request above, so if the
// registration succeed later, no request is lost.
- if (this.rmProxy == null) {
+ if (this.rmProxyRelayer == null) {
if (this.connectionInitiated) {
LOG.info("Unmanaged AM still not successfully launched/registered yet."
+ " Saving the allocate request and send later.");
@@ -328,6 +328,15 @@ public ApplicationId getAppId() {
return this.applicationId;
}
+ /**
+ * Returns the rmProxy relayer of this UAM.
+ *
+ * @return rmProxy relayer of the UAM
+ */
+ public AMRMClientRelayer getAMRMClientRelayer() {
+ return this.rmProxyRelayer;
+ }
+
/**
* Returns RM proxy for the specified protocol type. Unit test cases can
* override this method and return mock proxy instances.
@@ -592,10 +601,7 @@ public void run() {
}
request.setResponseId(lastResponseId);
-
- AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
- request, rmProxy, registerRequest, applicationId);
-
+ AllocateResponse response = rmProxyRelayer.allocate(request);
if (response == null) {
throw new YarnException("Null allocateResponse from allocate");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 23cd3e2173..9b4d91d3dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -251,8 +251,6 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Registering application attempt: " + attemptId);
- shouldReRegisterNext = false;
-
List containersFromPreviousAttempt = null;
synchronized (applicationContainerIdMap) {
@@ -266,7 +264,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
containersFromPreviousAttempt.add(Container.newInstance(containerId,
null, null, null, null, null));
}
- } else {
+ } else if (!shouldReRegisterNext) {
throw new InvalidApplicationMasterRequestException(
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
}
@@ -276,6 +274,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
}
}
+ shouldReRegisterNext = false;
+
// Make sure we wait for certain test cases last in the method
synchronized (syncObj) {
syncObj.notifyAll();
@@ -339,13 +339,6 @@ public AllocateResponse allocate(AllocateRequest request)
validateRunning();
- if (request.getAskList() != null && request.getAskList().size() > 0
- && request.getReleaseList() != null
- && request.getReleaseList().size() > 0) {
- Assert.fail("The mock RM implementation does not support receiving "
- + "askList and releaseList in the same heartbeat");
- }
-
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Allocate from application attempt: " + attemptId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 57407490c4..645e47e5af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -62,14 +62,15 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
@@ -106,9 +107,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public static final String NMSS_REG_RESPONSE_KEY =
NMSS_CLASS_PREFIX + "registerResponse";
- /*
+ /**
* When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
- * Registry. Otherwise if NM recovery is enabled, the UAM token are store in
+ * Registry. Otherwise if NM recovery is enabled, the UAM token are stored in
* local NMSS instead under this directory name.
*/
public static final String NMSS_SECONDARY_SC_PREFIX =
@@ -119,8 +120,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* The home sub-cluster is the sub-cluster where the AM container is running
* in.
*/
- private ApplicationMasterProtocol homeRM;
+ private AMRMClientRelayer homeRMRelayer;
private SubClusterId homeSubClusterId;
+ private volatile int lastHomeResponseId;
+
+ /**
+ * A flag for work preserving NM restart. If we just recovered, we need to
+ * generate an {@link ApplicationMasterNotRegisteredException} exception back
+ * to AM (similar to what RM will do after its restart/fail-over) in its next
+ * allocate to trigger AM re-register (which we will shield from RM and just
+ * return our saved register response) and a full pending requests re-send, so
+ * that all the {@link AMRMClientRelayer} will be re-populated with all
+ * pending requests.
+ *
+ * TODO: When split-merge is not idempotent, this can lead to some
+ * over-allocation without a full cancel to RM.
+ */
+ private volatile boolean justRecovered;
/**
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@@ -134,6 +150,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private UnmanagedAMPoolManager uamPool;
+ /**
+ * The rmProxy relayers for secondary sub-clusters that keep track of all
+ * pending requests.
+ */
+ private Map secondaryRelayers;
+
/** Thread pool used for asynchronous operations. */
private ExecutorService threadpool;
@@ -186,8 +208,11 @@ public FederationInterceptor() {
this.asyncResponseSink = new ConcurrentHashMap<>();
this.threadpool = Executors.newCachedThreadPool();
this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
+ this.secondaryRelayers = new ConcurrentHashMap<>();
this.amRegistrationRequest = null;
this.amRegistrationResponse = null;
+ this.lastHomeResponseId = Integer.MAX_VALUE;
+ this.justRecovered = false;
}
/**
@@ -224,8 +249,8 @@ public void init(AMRMProxyApplicationContext appContext) {
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
- this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
- this.appOwner);
+ this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
+ ApplicationMasterProtocol.class, this.appOwner));
this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -240,13 +265,12 @@ public void init(AMRMProxyApplicationContext appContext) {
@Override
public void recover(Map recoveredDataMap) {
super.recover(recoveredDataMap);
- LOG.info("Recovering data for FederationInterceptor");
+ ApplicationAttemptId attemptId =
+ getApplicationContext().getApplicationAttemptId();
+ LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
if (recoveredDataMap == null) {
return;
}
-
- ApplicationAttemptId attemptId =
- getApplicationContext().getApplicationAttemptId();
try {
if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
RegisterApplicationMasterRequestProto pb =
@@ -255,6 +279,9 @@ public void recover(Map recoveredDataMap) {
this.amRegistrationRequest =
new RegisterApplicationMasterRequestPBImpl(pb);
LOG.info("amRegistrationRequest recovered for {}", attemptId);
+
+ // Give the register request to homeRMRelayer for future re-registration
+ this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
}
if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
RegisterApplicationMasterResponseProto pb =
@@ -263,6 +290,9 @@ public void recover(Map recoveredDataMap) {
this.amRegistrationResponse =
new RegisterApplicationMasterResponsePBImpl(pb);
LOG.info("amRegistrationResponse recovered for {}", attemptId);
+ // Trigger re-register and full pending re-send only if we have a
+ // saved register response. This should always be true though.
+ this.justRecovered = true;
}
// Recover UAM amrmTokens from registry or NMSS
@@ -309,6 +339,9 @@ public void recover(Map recoveredDataMap) {
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
entry.getValue());
+ this.secondaryRelayers.put(subClusterId.getId(),
+ this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
+
RegisterApplicationMasterResponse response =
this.uamPool.registerApplicationMaster(subClusterId.getId(),
this.amRegistrationRequest);
@@ -436,7 +469,7 @@ public void recover(Map recoveredDataMap) {
* the other sub-cluster RM will be done lazily as needed later.
*/
this.amRegistrationResponse =
- this.homeRM.registerApplicationMaster(request);
+ this.homeRMRelayer.registerApplicationMaster(request);
if (this.amRegistrationResponse
.getContainersFromPreviousAttempts() != null) {
cacheAllocatedContainers(
@@ -495,6 +528,34 @@ public AllocateResponse allocate(AllocateRequest request)
Preconditions.checkArgument(this.policyInterpreter != null,
"Allocate should be called after registerApplicationMaster");
+ if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) {
+ // Save the responseId home RM is expecting
+ this.lastHomeResponseId = request.getResponseId();
+
+ throw new ApplicationMasterNotRegisteredException(
+ "AMRMProxy just restarted and recovered for "
+ + getApplicationContext().getApplicationAttemptId()
+ + ". AM should re-register and full re-send pending requests.");
+ }
+
+ // Override responseId in the request in two cases:
+ //
+ // 1. After we just recovered after an NM restart and AM's responseId is
+ // reset due to the exception we generate. We need to override the
+ // responseId to the one homeRM expects.
+ //
+ // 2. After homeRM fail-over, the allocate response with reseted responseId
+ // might not be returned successfully back to AM because of RPC connection
+ // timeout between AM and AMRMProxy. In this case, we remember and reset the
+ // responseId for AM.
+ if (this.justRecovered
+ || request.getResponseId() > this.lastHomeResponseId) {
+ LOG.warn("Setting allocate responseId for {} from {} to {}",
+ getApplicationContext().getApplicationAttemptId(),
+ request.getResponseId(), this.lastHomeResponseId);
+ request.setResponseId(this.lastHomeResponseId);
+ }
+
try {
// Split the heart beat request into multiple requests, one for each
// sub-cluster RM that is used by this application.
@@ -509,10 +570,18 @@ public AllocateResponse allocate(AllocateRequest request)
sendRequestsToSecondaryResourceManagers(requests);
// Send the request to the home RM and get the response
- AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
- requests.get(this.homeSubClusterId), this.homeRM,
- this.amRegistrationRequest,
- getApplicationContext().getApplicationAttemptId().getApplicationId());
+ AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
+ LOG.info("{} heartbeating to home RM with responseId {}",
+ getApplicationContext().getApplicationAttemptId(),
+ homeRequest.getResponseId());
+
+ AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
+
+ // Reset the flag after the first successful homeRM allocate response,
+ // otherwise keep overriding the responseId of new allocate request
+ if (this.justRecovered) {
+ this.justRecovered = false;
+ }
// Notify policy of home response
try {
@@ -540,6 +609,22 @@ public AllocateResponse allocate(AllocateRequest request)
newRegistrations.getSuccessfulRegistrations());
}
+ LOG.info("{} heartbeat response from home RM with responseId {}",
+ getApplicationContext().getApplicationAttemptId(),
+ homeResponse.getResponseId());
+
+ // Update lastHomeResponseId in three cases:
+ // 1. The normal responseId increments
+ // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
+ // over, AMRMClientRelayer auto re-register and full re-send for homeRM.
+ // 3. lastHomeResponseId == MAX_INT. This is the initial case or
+ // responseId about to overflow and wrap around
+ if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
+ || homeResponse.getResponseId() == 1
+ || this.lastHomeResponseId == Integer.MAX_VALUE) {
+ this.lastHomeResponseId = homeResponse.getResponseId();
+ }
+
// return the final response to the application master.
return homeResponse;
} catch (IOException ex) {
@@ -584,6 +669,16 @@ public FinishApplicationMasterResponseInfo call() throws Exception {
try {
uamResponse =
uamPool.finishApplicationMaster(subClusterId, finishRequest);
+
+ if (uamResponse.getIsUnregistered()) {
+ secondaryRelayers.remove(subClusterId);
+
+ if (getNMStateStore() != null) {
+ getNMStateStore().removeAMRMProxyAppContextEntry(
+ getApplicationContext().getApplicationAttemptId(),
+ NMSS_SECONDARY_SC_PREFIX + subClusterId);
+ }
+ }
} catch (Throwable e) {
LOG.warn("Failed to finish unmanaged application master: "
+ "RM address: " + subClusterId + " ApplicationId: "
@@ -600,9 +695,7 @@ public FinishApplicationMasterResponseInfo call() throws Exception {
// asynchronously by other sub-cluster resource managers, send the same
// request to the home resource manager on this thread.
FinishApplicationMasterResponse homeResponse =
- AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
- this.amRegistrationRequest, getApplicationContext()
- .getApplicationAttemptId().getApplicationId());
+ this.homeRMRelayer.finishApplicationMaster(request);
if (subClusterIds.size() > 0) {
// Wait for other sub-cluster resource managers to return the
@@ -621,10 +714,6 @@ this.amRegistrationRequest, getApplicationContext()
if (uamResponse.getResponse() == null
|| !uamResponse.getResponse().getIsUnregistered()) {
failedToUnRegister = true;
- } else if (getNMStateStore() != null) {
- getNMStateStore().removeAMRMProxyAppContextEntry(
- getApplicationContext().getApplicationAttemptId(),
- NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId());
}
} catch (Throwable e) {
failedToUnRegister = true;
@@ -689,6 +778,11 @@ protected FederationRegistryClient getRegistryClient() {
return this.registryClient;
}
+ @VisibleForTesting
+ protected int getLastHomeResponseId() {
+ return this.lastHomeResponseId;
+ }
+
/**
* Create the UAM pool manager for secondary sub-clsuters. For unit test to
* override.
@@ -800,6 +894,9 @@ public RegisterApplicationMasterResponse call() throws Exception {
getApplicationContext().getUser(), homeSubClusterId.getId(),
amrmToken);
+ secondaryRelayers.put(subClusterId.getId(),
+ uamPool.getAMRMClientRelayer(subClusterId.getId()));
+
response = uamPool.registerApplicationMaster(
subClusterId.getId(), amRegistrationRequest);
@@ -1098,7 +1195,10 @@ public RegisterApplicationMasterResponseInfo call()
token = uamPool.launchUAM(subClusterId, config,
appContext.getApplicationAttemptId().getApplicationId(),
amRegistrationResponse.getQueue(), appContext.getUser(),
- homeSubClusterId.toString(), registryClient != null);
+ homeSubClusterId.toString(), true);
+
+ secondaryRelayers.put(subClusterId,
+ uamPool.getAMRMClientRelayer(subClusterId));
uamResponse = uamPool.registerApplicationMaster(subClusterId,
registerRequest);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 677732d634..2794857c6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -536,6 +537,7 @@ protected ResourceRequest createResourceRequest(String resource,
capability.setMemorySize(memory);
capability.setVirtualCores(vCores);
req.setCapability(capability);
+ req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance());
if (labelExpression != null) {
req.setNodeLabelExpression(labelExpression);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index eefaba1a8b..a837eed14c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
@@ -516,6 +517,22 @@ public Object run() throws Exception {
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+ Assert.assertEquals(Integer.MAX_VALUE,
+ interceptor.getLastHomeResponseId());
+
+ // The first allocate call expects a fail-over exception and re-register
+ int responseId = 10;
+ AllocateRequest allocateRequest =
+ Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(responseId);
+ try {
+ interceptor.allocate(allocateRequest);
+ Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
+ + " after FederationInterceptor restarts and recovers");
+ } catch (ApplicationMasterNotRegisteredException e) {
+ }
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertEquals(responseId, interceptor.getLastHomeResponseId());
// Release all containers
releaseContainersAndAssert(containers);