HDFS-3891. Make selectInputStreams throw IOE instead of RTE. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1381481 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-09-06 06:57:18 +00:00
parent 99ec5bd8d3
commit 437948ea1c
7 changed files with 40 additions and 21 deletions

View File

@ -44,3 +44,5 @@ HDFS-3869. Expose non-file journal manager details in web UI (todd)
HDFS-3884. Journal format() should reset cached values (todd) HDFS-3884. Journal format() should reset cached values (todd)
HDFS-3870. Add metrics to JournalNode (todd) HDFS-3870. Add metrics to JournalNode (todd)
HDFS-3891. Make selectInputStreams throw IOE instead of RTE (todd)

View File

@ -158,9 +158,11 @@ class AsyncLoggerSet {
timeoutMs); timeoutMs);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting for quorum results"); throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " +
"quorum of nodes to respond.");
} catch (TimeoutException e) { } catch (TimeoutException e) {
throw new IOException("Timed out waiting " + timeoutMs + " for write quorum"); throw new IOException("Timed out waiting " + timeoutMs + "ms for a " +
"quorum of nodes to respond.");
} }
if (q.countSuccesses() < majority) { if (q.countSuccesses() < majority) {

View File

@ -394,17 +394,12 @@ public class QuorumJournalManager implements JournalManager {
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) { long fromTxnId, boolean inProgressOk) throws IOException {
QuorumCall<AsyncLogger, RemoteEditLogManifest> q = QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId); loggers.getEditLogManifest(fromTxnId);
Map<AsyncLogger, RemoteEditLogManifest> resps; Map<AsyncLogger, RemoteEditLogManifest> resps =
try { loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs);
resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs);
} catch (IOException ioe) {
// TODO: can we do better here?
throw new RuntimeException(ioe);
}
LOG.debug("selectInputStream manifests:\n" + LOG.debug("selectInputStream manifests:\n" +
Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); Joiner.on("\n").withKeyValueSeparator(": ").join(resps));

View File

@ -242,15 +242,8 @@ public class FileJournalManager implements JournalManager {
@Override @Override
synchronized public void selectInputStreams( synchronized public void selectInputStreams(
Collection<EditLogInputStream> streams, long fromTxId, Collection<EditLogInputStream> streams, long fromTxId,
boolean inProgressOk) { boolean inProgressOk) throws IOException {
List<EditLogFile> elfs; List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
try {
elfs = matchEditLogs(sd.getCurrentDir());
} catch (IOException e) {
LOG.error("error listing files in " + this + ". " +
"Skipping all edit logs in this directory.", e);
return;
}
LOG.debug(this + ": selecting input streams starting at " + fromTxId + LOG.debug(this + ": selecting input streams starting at " + fromTxId +
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") + (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)"); "from among " + elfs.size() + " candidate file(s)");

View File

@ -65,9 +65,11 @@ public interface JournalManager extends Closeable, FormatConfirmable,
* @param inProgressOk whether or not in-progress streams should be returned * @param inProgressOk whether or not in-progress streams should be returned
* *
* @return a list of streams * @return a list of streams
* @throws IOException if the underlying storage has an error or is otherwise
* inaccessible
*/ */
void selectInputStreams(Collection<EditLogInputStream> streams, void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk); long fromTxnId, boolean inProgressOk) throws IOException;
/** /**
* Set the amount of memory that this stream should use to buffer edits * Set the amount of memory that this stream should use to buffer edits

View File

@ -247,7 +247,12 @@ public class JournalSet implements JournalManager {
LOG.info("Skipping jas " + jas + " since it's disabled"); LOG.info("Skipping jas " + jas + " since it's disabled");
continue; continue;
} }
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk); try {
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
} catch (IOException ioe) {
LOG.warn("Unable to determine input streams from " + jas.getManager() +
". Skipping.", ioe);
}
} }
chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk); chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk);
} }

View File

@ -211,6 +211,26 @@ public class TestQuorumJournalManager {
} }
} }
/**
* Regression test for HDFS-3891: selectInputStreams should throw
* an exception when a majority of journalnodes have crashed.
*/
@Test
public void testSelectInputStreamsMajorityDown() throws Exception {
// Shut down all of the JNs.
cluster.shutdown();
List<EditLogInputStream> streams = Lists.newArrayList();
try {
qjm.selectInputStreams(streams, 0, false);
fail("Did not throw IOE");
} catch (QuorumException ioe) {
GenericTestUtils.assertExceptionContains(
"Got too many exceptions", ioe);
assertTrue(streams.isEmpty());
}
}
/** /**
* Test the case where the NN crashes after starting a new segment * Test the case where the NN crashes after starting a new segment
* on all nodes, but before writing the first transaction to it. * on all nodes, but before writing the first transaction to it.