YARN-4965. Distributed shell AM failed due to ClientHandlerException thrown by jersey. Contributed by Junping Du

This commit is contained in:
Xuan 2016-04-16 19:39:18 -07:00
parent cc8b83a8e8
commit e6c0742012
5 changed files with 67 additions and 10 deletions

View File

@ -77,6 +77,8 @@
import org.codehaus.jackson.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
@ -1032,10 +1034,7 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
+ error.getErrorCode());
}
}
} catch (IOException ex) {
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+ "Server", ex);
} catch (YarnException ex) {
} catch (YarnException | IOException | ClientHandlerException ex) {
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+ "Server", ex);
}

View File

@ -131,6 +131,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>

View File

@ -104,6 +104,7 @@
import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
/**
* An ApplicationMaster for executing shell commands on a set of launched
@ -1149,13 +1150,14 @@ private void publishContainerStartEvent(
putContainerEntity(timelineClient,
container.getId().getApplicationAttemptId(),
entity));
} catch (YarnException | IOException e) {
} catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("Container start event could not be published for "
+ container.getId().toString(), e);
}
}
private void publishContainerEndEvent(
@VisibleForTesting
void publishContainerEndEvent(
final TimelineClient timelineClient, ContainerStatus container,
String domainId, UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity();
@ -1177,7 +1179,7 @@ private void publishContainerEndEvent(
putContainerEntity(timelineClient,
container.getContainerId().getApplicationAttemptId(),
entity));
} catch (YarnException | IOException e) {
} catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("Container end event could not be published for "
+ container.getContainerId().toString(), e);
}
@ -1212,7 +1214,7 @@ private void publishApplicationAttemptEvent(
try {
TimelinePutResponse response = timelineClient.putEntities(entity);
processTimelineResponseErrors(response);
} catch (YarnException | IOException e) {
} catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.applications.distributedshell;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -27,6 +31,7 @@
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
@ -46,14 +51,24 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -61,6 +76,7 @@
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
@ -69,6 +85,8 @@
import org.junit.Test;
import org.junit.rules.Timeout;
import com.sun.jersey.api.client.ClientHandlerException;
public class TestDistributedShell {
private static final Log LOG =
@ -77,6 +95,7 @@ public class TestDistributedShell {
protected MiniYARNCluster yarnCluster = null;
protected MiniDFSCluster hdfsCluster = null;
private FileSystem fs = null;
private TimelineWriter spyTimelineWriter;
protected YarnConfiguration conf = null;
private static final int NUM_NMS = 1;
private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
@ -865,6 +884,37 @@ public void testDSShellWithInvalidArgs() throws Exception {
}
}
@Test
public void testDSTimelineClientWithConnectionRefuse() throws Exception {
ApplicationMaster am = new ApplicationMaster();
TimelineClientImpl client = new TimelineClientImpl() {
@Override
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
URI resURI) throws IOException {
TimelineWriter timelineWriter =
new DirectTimelineWriter(authUgi, client, resURI);
spyTimelineWriter = spy(timelineWriter);
return spyTimelineWriter;
}
};
client.init(conf);
client.start();
TestTimelineClient.mockEntityClientResponse(spyTimelineWriter, null,
false, true);
try {
UserGroupInformation ugi = mock(UserGroupInformation.class);
when(ugi.getShortUserName()).thenReturn("user1");
// verify no ClientHandlerException get thrown out.
am.publishContainerEndEvent(client, ContainerStatus.newInstance(
BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
1), "domainId", ugi);
} finally {
client.stop();
}
}
protected void waitForNMsToRegister() throws Exception {
int sec = 60;
while (sec >= 0) {

View File

@ -298,7 +298,7 @@ private void assertException(TimelineClientImpl client, RuntimeException ce) {
client.connectionRetry.getRetired());
}
private static ClientResponse mockEntityClientResponse(
public static ClientResponse mockEntityClientResponse(
TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasError, boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class);