MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du

This commit is contained in:
Jason Lowe 2014-11-13 15:42:25 +00:00
parent 7dae5b5a88
commit 177e8090f5
2 changed files with 33 additions and 9 deletions

View File

@ -465,6 +465,9 @@ Release 2.6.0 - 2014-11-15
MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
(Emilio Coppa and jlowe via kihwal)
MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused
correctly (Junping Du via jlowe)
Release 2.5.2 - 2014-11-10
INCOMPATIBLE CHANGES

View File

@ -407,7 +407,7 @@ private void openConnectionWithRetry(MapHost host,
}
if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
LOG.warn("Failed to connect to host: " + url + "after "
+ fetchRetryTimeout + "milliseconds.");
+ fetchRetryTimeout + " milliseconds.");
throw e;
}
try {
@ -596,7 +596,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
} else {
// timeout, prepare to be failed.
LOG.warn("Timeout for copying MapOutput with retry on host " + host
+ "after " + fetchRetryTimeout + "milliseconds.");
+ "after " + fetchRetryTimeout + " milliseconds.");
}
}
@ -678,28 +678,49 @@ private void connect(URLConnection connection, int connectionTimeout)
} else if (connectionTimeout > 0) {
unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
}
long startTime = Time.monotonicNow();
long lastTime = startTime;
int attempts = 0;
// set the connect timeout to the unit-connect-timeout
connection.setConnectTimeout(unit);
while (true) {
try {
attempts++;
connection.connect();
break;
} catch (IOException ioe) {
// update the total remaining connect-timeout
connectionTimeout -= unit;
long currentTime = Time.monotonicNow();
long retryTime = currentTime - startTime;
long leftTime = connectionTimeout - retryTime;
long timeSinceLastIteration = currentTime - lastTime;
// throw an exception if we have waited for timeout amount of time
// note that the updated value if timeout is used here
if (connectionTimeout == 0) {
if (leftTime <= 0) {
int retryTimeInSeconds = (int) retryTime/1000;
LOG.error("Connection retry failed with " + attempts +
" attempts in " + retryTimeInSeconds + " seconds");
throw ioe;
}
// reset the connect timeout for the last try
if (connectionTimeout < unit) {
unit = connectionTimeout;
if (leftTime < unit) {
unit = (int)leftTime;
// reset the connect time out for the final connect
connection.setConnectTimeout(unit);
}
if (timeSinceLastIteration < unit) {
try {
// sleep the left time of unit
sleep(unit - timeSinceLastIteration);
} catch (InterruptedException e) {
LOG.warn("Sleep in connection retry get interrupted.");
if (stopped) {
return;
}
}
}
// update the total remaining connect-timeout
lastTime = Time.monotonicNow();
}
}
}