diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d911107e6b..720be02ceb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -53,6 +53,8 @@ Trunk (unreleased changes)
HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
+ HDFS-2158. Add JournalSet to manage the set of journals. (jitendra)
+
BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
index 5dd82393fd..836c22d014 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
@@ -54,7 +54,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
this.nnRegistration = nnReg;
InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress());
- Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
try {
this.backupNode =
RPC.getProxy(JournalProtocol.class,
@@ -67,16 +66,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
- @Override // JournalStream
- public String getName() {
- return bnRegistration.getAddress();
- }
-
- @Override // JournalStream
- public JournalType getType() {
- return JournalType.BACKUP;
- }
-
@Override // EditLogOutputStream
void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op);
@@ -141,16 +130,6 @@ protected void flushAndSync() throws IOException {
}
}
- /**
- * There is no persistent storage. Therefore length is 0.
- * Length is used to check when it is large enough to start a checkpoint.
- * This criteria should not be used for backup streams.
- */
- @Override // EditLogOutputStream
- long length() throws IOException {
- return 0;
- }
-
/**
* Get backup node registration.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
index be75f637a9..4780d04b00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
@@ -37,9 +37,7 @@
* stores edits in a local file.
*/
class EditLogFileOutputStream extends EditLogOutputStream {
- private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);;
-
- private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE / Byte.SIZE;
+ private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
private File file;
private FileOutputStream fp; // file stream for storing edit logs
@@ -73,16 +71,6 @@ class EditLogFileOutputStream extends EditLogOutputStream {
fc.position(fc.size());
}
- @Override // JournalStream
- public String getName() {
- return file.getPath();
- }
-
- @Override // JournalStream
- public JournalType getType() {
- return JournalType.FILE;
- }
-
/** {@inheritDoc} */
@Override
void write(FSEditLogOp op) throws IOException {
@@ -176,7 +164,10 @@ protected void flushAndSync() throws IOException {
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
-
+ if (doubleBuf.isFlushed()) {
+ LOG.info("Nothing to flush");
+ return;
+ }
preallocate(); // preallocate file if necessary
doubleBuf.flushTo(fp);
fc.force(false); // metadata updates not needed because of preallocation
@@ -190,16 +181,6 @@ protected void flushAndSync() throws IOException {
public boolean shouldForceSync() {
return doubleBuf.shouldForceSync();
}
-
- /**
- * Return the size of the current edit log including buffered data.
- */
- @Override
- long length() throws IOException {
- // file size - header size + size of both buffers
- return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES +
- doubleBuf.countBufferedBytes();
- }
// allocate a big chunk of data
private void preallocate() throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
index 8577db8e45..8681837de5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
@@ -18,23 +18,20 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
-import java.util.zip.Checksum;
import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
/**
* A generic abstract class to support journaling of edits logs into
* a persistent storage.
*/
-abstract class EditLogOutputStream implements JournalStream {
+abstract class EditLogOutputStream {
// these are statistics counters
private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync
- EditLogOutputStream() throws IOException {
+ EditLogOutputStream() {
numSync = totalTimeSync = 0;
}
@@ -105,12 +102,6 @@ public void flush() throws IOException {
totalTimeSync += (end - start);
}
- /**
- * Return the size of the current edits log.
- * Length is used to check when it is large enough to start a checkpoint.
- */
- abstract long length() throws IOException;
-
/**
* Implement the policy when to automatically sync the buffered edits log
* The buffered edits can be flushed when the buffer becomes full or
@@ -132,12 +123,7 @@ long getTotalSyncTime() {
/**
* Return number of calls to {@link #flushAndSync()}
*/
- long getNumSync() {
+ protected long getNumSync() {
return numSync;
}
-
- @Override // Object
- public String toString() {
- return getName();
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index e355a9d838..4a41a2cbd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,25 +34,17 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/**
* FSEditLog maintains a log of the namespace modifications.
@@ -62,9 +54,6 @@
@InterfaceStability.Evolving
public class FSEditLog {
- static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
- " File system changes are not persistent. No journal streams.";
-
static final Log LOG = LogFactory.getLog(FSEditLog.class);
/**
@@ -82,10 +71,11 @@ private enum State {
CLOSED;
}
private State state = State.UNINITIALIZED;
+
+ //initialize
+ final private JournalSet journalSet;
+ private EditLogOutputStream editLogStream = null;
-
- private List journals = Lists.newArrayList();
-
// a monotonically increasing counter that represents transactionIds.
private long txid = 0;
@@ -137,15 +127,15 @@ protected synchronized TransactionId initialValue() {
this.storage = storage;
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = now();
-
+
+ this.journalSet = new JournalSet();
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
- journals.add(new JournalAndStream(new FileJournalManager(sd)));
+ journalSet.add(new FileJournalManager(sd));
}
- if (journals.isEmpty()) {
+ if (journalSet.isEmpty()) {
LOG.error("No edits directories configured!");
- }
-
+ }
state = State.BETWEEN_LOG_SEGMENTS;
}
@@ -172,9 +162,8 @@ synchronized void close() {
LOG.warn("Closing log when already closed", new Exception());
return;
}
-
if (state == State.IN_SEGMENT) {
- assert !journals.isEmpty();
+ assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
@@ -193,20 +182,14 @@ void logEdit(final FSEditLogOp op) {
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
- if (journals.isEmpty()) {
- throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
- }
-
long start = beginTransaction();
op.setTransactionId(txid);
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- if (!jas.isActive()) return;
- jas.stream.write(op);
- }
- }, "logging edit");
+ try {
+ editLogStream.write(op);
+ } catch (IOException ex) {
+ // All journals failed, it is handled in logSync.
+ }
endTransaction(start);
@@ -251,14 +234,7 @@ synchronized void doneWithAutoSyncScheduling() {
* @return true if any of the edit stream says that it should sync
*/
private boolean shouldForceSync() {
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
-
- if (jas.getCurrentStream().shouldForceSync()) {
- return true;
- }
- }
- return false;
+ return editLogStream.shouldForceSync();
}
private long beginTransaction() {
@@ -322,7 +298,7 @@ synchronized void setNextTxId(long nextTxId) {
* NOTE: this should be done while holding the FSNamesystem lock, or
* else more operations can start writing while this is in progress.
*/
- void logSyncAll() throws IOException {
+ void logSyncAll() {
// Record the most recent transaction ID as our own id
synchronized (this) {
TransactionId id = myTransactionId.get();
@@ -366,74 +342,73 @@ public void logSync() {
// Fetch the transactionId of this thread.
long mytxid = myTransactionId.get().txid;
- List candidateJournals =
- Lists.newArrayListWithCapacity(journals.size());
- List badJournals = Lists.newArrayList();
-
boolean sync = false;
try {
+ EditLogOutputStream logStream = null;
synchronized (this) {
try {
- printStatistics(false);
-
- // if somebody is already syncing, then wait
- while (mytxid > synctxid && isSyncRunning) {
- try {
- wait(1000);
- } catch (InterruptedException ie) {
+ printStatistics(false);
+
+ // if somebody is already syncing, then wait
+ while (mytxid > synctxid && isSyncRunning) {
+ try {
+ wait(1000);
+ } catch (InterruptedException ie) {
+ }
}
- }
- //
- // If this transaction was already flushed, then nothing to do
- //
- if (mytxid <= synctxid) {
- numTransactionsBatchedInSync++;
- if (metrics != null) // Metrics is non-null only when used inside name node
- metrics.incrTransactionsBatchedInSync();
- return;
- }
+ //
+ // If this transaction was already flushed, then nothing to do
+ //
+ if (mytxid <= synctxid) {
+ numTransactionsBatchedInSync++;
+ if (metrics != null) {
+ // Metrics is non-null only when used inside name node
+ metrics.incrTransactionsBatchedInSync();
+ }
+ return;
+ }
- // now, this thread will do the sync
- syncStart = txid;
- isSyncRunning = true;
- sync = true;
+ // now, this thread will do the sync
+ syncStart = txid;
+ isSyncRunning = true;
+ sync = true;
- // swap buffers
- assert !journals.isEmpty() : "no editlog streams";
-
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
+ // swap buffers
try {
- jas.getCurrentStream().setReadyToFlush();
- candidateJournals.add(jas);
- } catch (IOException ie) {
- LOG.error("Unable to get ready to flush.", ie);
- badJournals.add(jas);
+ if (journalSet.isEmpty()) {
+ throw new IOException("No journals available to flush");
+ }
+ editLogStream.setReadyToFlush();
+ } catch (IOException e) {
+ LOG.fatal("Could not sync any journal to persistent storage. "
+ + "Unsynced transactions: " + (txid - synctxid),
+ new Exception());
+ runtime.exit(1);
}
- }
} finally {
// Prevent RuntimeException from blocking other log edit write
doneWithAutoSyncScheduling();
}
+ //editLogStream may become null,
+ //so store a local variable for flush.
+ logStream = editLogStream;
}
-
+
// do the sync
long start = now();
- for (JournalAndStream jas : candidateJournals) {
- if (!jas.isActive()) continue;
- try {
- jas.getCurrentStream().flush();
- } catch (IOException ie) {
- LOG.error("Unable to sync edit log.", ie);
- //
- // remember the streams that encountered an error.
- //
- badJournals.add(jas);
+ try {
+ if (logStream != null) {
+ logStream.flush();
+ }
+ } catch (IOException ex) {
+ synchronized (this) {
+ LOG.fatal("Could not sync any journal to persistent storage. "
+ + "Unsynced transactions: " + (txid - synctxid), new Exception());
+ runtime.exit(1);
}
}
long elapsed = now() - start;
- disableAndReportErrorOnJournals(badJournals);
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
@@ -443,13 +418,6 @@ public void logSync() {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
- if (badJournals.size() >= journals.size()) {
- LOG.fatal("Could not sync any journal to persistent storage. " +
- "Unsynced transactions: " + (txid - synctxid),
- new Exception());
- runtime.exit(1);
- }
-
synctxid = syncStart;
isSyncRunning = false;
}
@@ -466,9 +434,6 @@ private void printStatistics(boolean force) {
if (lastPrintTime + 60000 > now && !force) {
return;
}
- if (journals.isEmpty()) {
- return;
- }
lastPrintTime = now;
StringBuilder buf = new StringBuilder();
buf.append("Number of transactions: ");
@@ -478,20 +443,9 @@ private void printStatistics(boolean force) {
buf.append("Number of transactions batched in Syncs: ");
buf.append(numTransactionsBatchedInSync);
buf.append(" Number of syncs: ");
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
- buf.append(jas.getCurrentStream().getNumSync());
- break;
- }
-
+ buf.append(editLogStream.getNumSync());
buf.append(" SyncTimes(ms): ");
-
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
- EditLogOutputStream eStream = jas.getCurrentStream();
- buf.append(eStream.getTotalSyncTime());
- buf.append(" ");
- }
+ buf.append(journalSet.getSyncTimes());
LOG.info(buf);
}
@@ -664,7 +618,6 @@ void logSymlink(String path, String value, long mtime,
* log delegation token to edit log
* @param id DelegationTokenIdentifier
* @param expiryTime of the token
- * @return
*/
void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
@@ -702,25 +655,12 @@ void logReassignLease(String leaseHolder, String src, String newHolder) {
logEdit(op);
}
- /**
- * @return the number of active (non-failed) journals
- */
- private int countActiveJournals() {
- int count = 0;
- for (JournalAndStream jas : journals) {
- if (jas.isActive()) {
- count++;
- }
- }
- return count;
- }
-
/**
* Used only by unit tests.
*/
@VisibleForTesting
List getJournals() {
- return journals;
+ return journalSet.getAllJournalStreams();
}
/**
@@ -734,62 +674,9 @@ synchronized void setRuntimeForTesting(Runtime runtime) {
/**
* Return a manifest of what finalized edit logs are available
*/
- public synchronized RemoteEditLogManifest getEditLogManifest(
- long fromTxId) throws IOException {
- // Collect RemoteEditLogs available from each FileJournalManager
- List allLogs = Lists.newArrayList();
- for (JournalAndStream j : journals) {
- if (j.getManager() instanceof FileJournalManager) {
- FileJournalManager fjm = (FileJournalManager)j.getManager();
- try {
- allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
- } catch (Throwable t) {
- LOG.warn("Cannot list edit logs in " + fjm, t);
- }
- }
- }
-
- // Group logs by their starting txid
- ImmutableListMultimap logsByStartTxId =
- Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
- long curStartTxId = fromTxId;
-
- List logs = Lists.newArrayList();
- while (true) {
- ImmutableList logGroup = logsByStartTxId.get(curStartTxId);
- if (logGroup.isEmpty()) {
- // we have a gap in logs - for example because we recovered some old
- // storage directory with ancient logs. Clear out any logs we've
- // accumulated so far, and then skip to the next segment of logs
- // after the gap.
- SortedSet startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
- startTxIds = startTxIds.tailSet(curStartTxId);
- if (startTxIds.isEmpty()) {
- break;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found gap in logs at " + curStartTxId + ": " +
- "not returning previous logs in manifest.");
- }
- logs.clear();
- curStartTxId = startTxIds.first();
- continue;
- }
- }
-
- // Find the one that extends the farthest forward
- RemoteEditLog bestLog = Collections.max(logGroup);
- logs.add(bestLog);
- // And then start looking from after that point
- curStartTxId = bestLog.getEndTxId() + 1;
- }
- RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Generated manifest for logs since " + fromTxId + ":"
- + ret);
- }
- return ret;
+ public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
+ throws IOException {
+ return journalSet.getEditLogManifest(fromTxId);
}
/**
@@ -832,14 +719,9 @@ synchronized void startLogSegment(final long segmentTxId,
// See HDFS-2174.
storage.attemptRestoreRemovedStorage();
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- jas.startLogSegment(segmentTxId);
- }
- }, "starting log segment " + segmentTxId);
-
- if (countActiveJournals() == 0) {
+ try {
+ editLogStream = journalSet.startLogSegment(segmentTxId);
+ } catch (IOException ex) {
throw new IOException("Unable to start log segment " +
segmentTxId + ": no journals successfully started.");
}
@@ -873,14 +755,12 @@ synchronized void endCurrentLogSegment(boolean writeEndTxn) {
final long lastTxId = getLastWrittenTxId();
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- if (jas.isActive()) {
- jas.close(lastTxId);
- }
- }
- }, "ending log segment");
+ try {
+ journalSet.finalizeLogSegment(curSegmentTxId, lastTxId);
+ editLogStream = null;
+ } catch (IOException e) {
+ //All journals have failed, it will be handled in logSync.
+ }
state = State.BETWEEN_LOG_SEGMENTS;
}
@@ -889,14 +769,15 @@ public void apply(JournalAndStream jas) throws IOException {
* Abort all current logs. Called from the backup node.
*/
synchronized void abortCurrentLogSegment() {
- mapJournalsAndReportErrors(new JournalClosure() {
-
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- jas.abort();
+ try {
+ //Check for null, as abort can be called any time.
+ if (editLogStream != null) {
+ editLogStream.abort();
+ editLogStream = null;
}
- }, "aborting all streams");
- state = State.BETWEEN_LOG_SEGMENTS;
+ } catch (IOException e) {
+ LOG.warn("All journals failed to abort", e);
+ }
}
/**
@@ -912,13 +793,12 @@ public void purgeLogsOlderThan(final long minTxIdToKeep) {
"cannot purge logs older than txid " + minTxIdToKeep +
" when current segment starts at " + curSegmentTxId;
}
-
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- jas.manager.purgeLogsOlderThan(minTxIdToKeep);
- }
- }, "purging logs older than " + minTxIdToKeep);
+
+ try {
+ journalSet.purgeLogsOlderThan(minTxIdToKeep);
+ } catch (IOException ex) {
+ //All journals have failed, it will be handled in logSync.
+ }
}
@@ -946,9 +826,7 @@ synchronized long getSyncTxId() {
// sets the initial capacity of the flush buffer.
public void setOutputBufferCapacity(int size) {
- for (JournalAndStream jas : journals) {
- jas.manager.setOutputBufferCapacity(size);
- }
+ journalSet.setOutputBufferCapacity(size);
}
/**
@@ -969,7 +847,7 @@ synchronized void registerBackupNode(
if(bnReg.isRole(NamenodeRole.CHECKPOINT))
return; // checkpoint node does not stream edits
- JournalAndStream jas = findBackupJournalAndStream(bnReg);
+ JournalManager jas = findBackupJournal(bnReg);
if (jas != null) {
// already registered
LOG.info("Backup node " + bnReg + " re-registers");
@@ -978,35 +856,29 @@ synchronized void registerBackupNode(
LOG.info("Registering new backup node: " + bnReg);
BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
- journals.add(new JournalAndStream(bjm));
+ journalSet.add(bjm);
}
- synchronized void releaseBackupStream(NamenodeRegistration registration) {
- for (Iterator iter = journals.iterator();
- iter.hasNext();) {
- JournalAndStream jas = iter.next();
- if (jas.manager instanceof BackupJournalManager &&
- ((BackupJournalManager)jas.manager).matchesRegistration(
- registration)) {
- jas.abort();
- LOG.info("Removing backup journal " + jas);
- iter.remove();
- }
+ synchronized void releaseBackupStream(NamenodeRegistration registration)
+ throws IOException {
+ BackupJournalManager bjm = this.findBackupJournal(registration);
+ if (bjm != null) {
+ LOG.info("Removing backup journal " + bjm);
+ journalSet.remove(bjm);
}
}
/**
* Find the JournalAndStream associated with this BackupNode.
+ *
* @return null if it cannot be found
*/
- private synchronized JournalAndStream findBackupJournalAndStream(
+ private synchronized BackupJournalManager findBackupJournal(
NamenodeRegistration bnReg) {
- for (JournalAndStream jas : journals) {
- if (jas.manager instanceof BackupJournalManager) {
- BackupJournalManager bjm = (BackupJournalManager)jas.manager;
- if (bjm.matchesRegistration(bnReg)) {
- return jas;
- }
+ for (JournalManager bjm : journalSet.getJournalManagers()) {
+ if ((bjm instanceof BackupJournalManager)
+ && ((BackupJournalManager) bjm).matchesRegistration(bnReg)) {
+ return (BackupJournalManager) bjm;
}
}
return null;
@@ -1018,124 +890,24 @@ private synchronized JournalAndStream findBackupJournalAndStream(
*/
synchronized void logEdit(final int length, final byte[] data) {
long start = beginTransaction();
-
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- if (jas.isActive()) {
- jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
- }
- }
- }, "Logging edit");
+ try {
+ editLogStream.writeRaw(data, 0, length);
+ } catch (IOException ex) {
+ // All journals have failed, it will be handled in logSync.
+ }
endTransaction(start);
}
- //// Iteration across journals
- private interface JournalClosure {
- public void apply(JournalAndStream jas) throws IOException;
- }
-
- /**
- * Apply the given function across all of the journal managers, disabling
- * any for which the closure throws an IOException.
- * @param status message used for logging errors (e.g. "opening journal")
- */
- private void mapJournalsAndReportErrors(
- JournalClosure closure, String status) {
- List badJAS = Lists.newLinkedList();
- for (JournalAndStream jas : journals) {
- try {
- closure.apply(jas);
- } catch (Throwable t) {
- LOG.error("Error " + status + " (journal " + jas + ")", t);
- badJAS.add(jas);
- }
- }
-
- disableAndReportErrorOnJournals(badJAS);
- }
-
- /**
- * Called when some journals experience an error in some operation.
- * This propagates errors to the storage level.
- */
- private void disableAndReportErrorOnJournals(List badJournals) {
- if (badJournals == null || badJournals.isEmpty()) {
- return; // nothing to do
- }
-
- for (JournalAndStream j : badJournals) {
- LOG.error("Disabling journal " + j);
- j.abort();
- }
- }
-
- /**
- * Find the best editlog input stream to read from txid. In this case
- * best means the editlog which has the largest continuous range of
- * transactions starting from the transaction id, fromTxId.
- *
- * If a journal throws an CorruptionException while reading from a txn id,
- * it means that it has more transactions, but can't find any from fromTxId.
- * If this is the case and no other journal has transactions, we should throw
- * an exception as it means more transactions exist, we just can't load them.
- *
- * @param fromTxId Transaction id to start from.
- * @return a edit log input stream with tranactions fromTxId
- * or null if no more exist
- */
- private EditLogInputStream selectStream(long fromTxId)
- throws IOException {
- JournalManager bestjm = null;
- long bestjmNumTxns = 0;
- CorruptionException corruption = null;
-
- for (JournalAndStream jas : journals) {
- JournalManager candidate = jas.getManager();
- long candidateNumTxns = 0;
- try {
- candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
- } catch (CorruptionException ce) {
- corruption = ce;
- } catch (IOException ioe) {
- LOG.warn("Error reading number of transactions from " + candidate);
- continue; // error reading disk, just skip
- }
-
- if (candidateNumTxns > bestjmNumTxns) {
- bestjm = candidate;
- bestjmNumTxns = candidateNumTxns;
- }
- }
-
-
- if (bestjm == null) {
- /**
- * If all candidates either threw a CorruptionException or
- * found 0 transactions, then a gap exists.
- */
- if (corruption != null) {
- throw new IOException("Gap exists in logs from "
- + fromTxId, corruption);
- } else {
- return null;
- }
- }
-
- return bestjm.getInputStream(fromTxId);
- }
-
/**
* Run recovery on all journals to recover any unclosed segments
*/
void recoverUnclosedStreams() {
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- jas.manager.recoverUnfinalizedSegments();
- }
- }, "recovering unclosed streams");
+ try {
+ journalSet.recoverUnfinalizedSegments();
+ } catch (IOException ex) {
+ // All journals have failed, it is handled in logSync.
+ }
}
/**
@@ -1143,23 +915,16 @@ public void apply(JournalAndStream jas) throws IOException {
* @param fromTxId first transaction in the selected streams
* @param toAtLeast the selected streams must contain this transaction
*/
- Collection selectInputStreams(long fromTxId, long toAtLeastTxId)
- throws IOException {
- List streams = Lists.newArrayList();
-
- boolean gapFound = false;
- EditLogInputStream stream = selectStream(fromTxId);
+ Collection selectInputStreams(long fromTxId,
+ long toAtLeastTxId) throws IOException {
+ List streams = new ArrayList();
+ EditLogInputStream stream = journalSet.getInputStream(fromTxId);
while (stream != null) {
fromTxId = stream.getLastTxId() + 1;
streams.add(stream);
- try {
- stream = selectStream(fromTxId);
- } catch (IOException ioe) {
- gapFound = true;
- break;
- }
+ stream = journalSet.getInputStream(fromTxId);
}
- if (fromTxId <= toAtLeastTxId || gapFound) {
+ if (fromTxId <= toAtLeastTxId) {
closeAllStreams(streams);
throw new IOException("No non-corrupt logs for txid "
+ fromTxId);
@@ -1176,75 +941,4 @@ static void closeAllStreams(Iterable streams) {
IOUtils.closeStream(s);
}
}
-
- /**
- * Container for a JournalManager paired with its currently
- * active stream.
- *
- * If a Journal gets disabled due to an error writing to its
- * stream, then the stream will be aborted and set to null.
- */
- static class JournalAndStream {
- private final JournalManager manager;
- private EditLogOutputStream stream;
- private long segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
-
- private JournalAndStream(JournalManager manager) {
- this.manager = manager;
- }
-
- private void startLogSegment(long txId) throws IOException {
- Preconditions.checkState(stream == null);
- stream = manager.startLogSegment(txId);
- segmentStartsAtTxId = txId;
- }
-
- private void close(long lastTxId) throws IOException {
- Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
- "invalid segment: lastTxId %s >= " +
- "segment starting txid %s", lastTxId, segmentStartsAtTxId);
-
- if (stream == null) return;
- stream.close();
- manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
- stream = null;
- }
-
- @VisibleForTesting
- void abort() {
- if (stream == null) return;
- try {
- stream.abort();
- } catch (IOException ioe) {
- LOG.error("Unable to abort stream " + stream, ioe);
- }
- stream = null;
- segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
- }
-
- private boolean isActive() {
- return stream != null;
- }
-
- @VisibleForTesting
- EditLogOutputStream getCurrentStream() {
- return stream;
- }
-
- @Override
- public String toString() {
- return "JournalAndStream(mgr=" + manager +
- ", " + "stream=" + stream + ")";
- }
-
- @VisibleForTesting
- void setCurrentStreamForTests(EditLogOutputStream stream) {
- this.stream = stream;
- }
-
- @VisibleForTesting
- JournalManager getManager() {
- return manager;
- }
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
index 6e4c17161a..8cfc975823 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
@@ -23,7 +23,6 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
-import java.util.HashMap;
import java.util.Comparator;
import java.util.Collections;
import java.util.regex.Matcher;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
index 8440fe049b..0bb7b0f8aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
@@ -19,7 +19,6 @@
import java.io.IOException;
-import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
/**
* A JournalManager is responsible for managing a single place of storing
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
new file mode 100644
index 0000000000..0d6bc743da
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Sets;
+
+/**
+ * Manages a collection of Journals. None of the methods are synchronized, it is
+ * assumed that FSEditLog methods, that use this class, use proper
+ * synchronization.
+ */
+public class JournalSet implements JournalManager {
+
+ static final Log LOG = LogFactory.getLog(FSEditLog.class);
+
+ /**
+ * Container for a JournalManager paired with its currently
+ * active stream.
+ *
+ * If a Journal gets disabled due to an error writing to its
+ * stream, then the stream will be aborted and set to null.
+ *
+ * This should be used outside JournalSet only for testing.
+ */
+ @VisibleForTesting
+ static class JournalAndStream {
+ private final JournalManager journal;
+ private boolean disabled = false;
+ private EditLogOutputStream stream;
+
+ public JournalAndStream(JournalManager manager) {
+ this.journal = manager;
+ }
+
+ public void startLogSegment(long txId) throws IOException {
+ Preconditions.checkState(stream == null);
+ disabled = false;
+ stream = journal.startLogSegment(txId);
+ }
+
+ /**
+ * Closes the stream, also sets it to null.
+ */
+ public void close() throws IOException {
+ if (stream == null) return;
+ stream.close();
+ stream = null;
+ }
+
+ /**
+ * Aborts the stream, also sets it to null.
+ */
+ public void abort() {
+ if (stream == null) return;
+ try {
+ stream.abort();
+ } catch (IOException ioe) {
+ LOG.error("Unable to abort stream " + stream, ioe);
+ }
+ stream = null;
+ }
+
+ boolean isActive() {
+ return stream != null;
+ }
+
+ /**
+ * Should be used outside JournalSet only for testing.
+ */
+ EditLogOutputStream getCurrentStream() {
+ return stream;
+ }
+
+ @Override
+ public String toString() {
+ return "JournalAndStream(mgr=" + journal +
+ ", " + "stream=" + stream + ")";
+ }
+
+ void setCurrentStreamForTests(EditLogOutputStream stream) {
+ this.stream = stream;
+ }
+
+ JournalManager getManager() {
+ return journal;
+ }
+
+ private boolean isDisabled() {
+ return disabled;
+ }
+
+ private void setDisabled(boolean disabled) {
+ this.disabled = disabled;
+ }
+ }
+
+ private List journals = Lists.newArrayList();
+
+ @Override
+ public EditLogOutputStream startLogSegment(final long txId) throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ jas.startLogSegment(txId);
+ }
+ }, "starting log segment " + txId);
+ return new JournalSetOutputStream();
+ }
+
+ @Override
+ public void finalizeLogSegment(final long firstTxId, final long lastTxId)
+ throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ if (jas.isActive()) {
+ jas.close();
+ jas.getManager().finalizeLogSegment(firstTxId, lastTxId);
+ }
+ }
+ }, "finalize log segment " + firstTxId + ", " + lastTxId);
+ }
+
+
+ /**
+ * Find the best editlog input stream to read from txid.
+ * If a journal throws an CorruptionException while reading from a txn id,
+ * it means that it has more transactions, but can't find any from fromTxId.
+ * If this is the case and no other journal has transactions, we should throw
+ * an exception as it means more transactions exist, we just can't load them.
+ *
+ * @param fromTxnId Transaction id to start from.
+ * @return A edit log input stream with tranactions fromTxId
+ * or null if no more exist
+ */
+ @Override
+ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
+ JournalManager bestjm = null;
+ long bestjmNumTxns = 0;
+ CorruptionException corruption = null;
+
+ for (JournalAndStream jas : journals) {
+ JournalManager candidate = jas.getManager();
+ long candidateNumTxns = 0;
+ try {
+ candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId);
+ } catch (CorruptionException ce) {
+ corruption = ce;
+ } catch (IOException ioe) {
+ continue; // error reading disk, just skip
+ }
+
+ if (candidateNumTxns > bestjmNumTxns) {
+ bestjm = candidate;
+ bestjmNumTxns = candidateNumTxns;
+ }
+ }
+
+ if (bestjm == null) {
+ if (corruption != null) {
+ throw new IOException("No non-corrupt logs for txid "
+ + fromTxnId, corruption);
+ } else {
+ return null;
+ }
+ }
+ return bestjm.getInputStream(fromTxnId);
+ }
+
+ @Override
+ public long getNumberOfTransactions(long fromTxnId) throws IOException {
+ long num = 0;
+ for (JournalAndStream jas: journals) {
+ if (jas.isActive()) {
+ long newNum = jas.getManager().getNumberOfTransactions(fromTxnId);
+ if (newNum > num) {
+ num = newNum;
+ }
+ }
+ }
+ return num;
+ }
+
+ /**
+ * Returns true if there are no journals or all are disabled.
+ * @return True if no journals or all are disabled.
+ */
+ public boolean isEmpty() {
+ for (JournalAndStream jas : journals) {
+ if (!jas.isDisabled()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Called when some journals experience an error in some operation.
+ */
+ private void disableAndReportErrorOnJournals(List badJournals) {
+ if (badJournals == null || badJournals.isEmpty()) {
+ return; // nothing to do
+ }
+
+ for (JournalAndStream j : badJournals) {
+ LOG.error("Disabling journal " + j);
+ j.abort();
+ j.setDisabled(true);
+ }
+ }
+
+ /**
+ * Implementations of this interface encapsulate operations that can be
+ * iteratively applied on all the journals. For example see
+ * {@link JournalSet#mapJournalsAndReportErrors}.
+ */
+ private interface JournalClosure {
+ /**
+ * The operation on JournalAndStream.
+ * @param jas Object on which operations are performed.
+ * @throws IOException
+ */
+ public void apply(JournalAndStream jas) throws IOException;
+ }
+
+ /**
+ * Apply the given operation across all of the journal managers, disabling
+ * any for which the closure throws an IOException.
+ * @param closure {@link JournalClosure} object encapsulating the operation.
+ * @param status message used for logging errors (e.g. "opening journal")
+ * @throws IOException If the operation fails on all the journals.
+ */
+ private void mapJournalsAndReportErrors(
+ JournalClosure closure, String status) throws IOException{
+ List badJAS = Lists.newLinkedList();
+ for (JournalAndStream jas : journals) {
+ try {
+ closure.apply(jas);
+ } catch (Throwable t) {
+ LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
+ badJAS.add(jas);
+ }
+ }
+ disableAndReportErrorOnJournals(badJAS);
+ if (badJAS.size() >= journals.size()) {
+ LOG.error("Error: "+status+" failed for all journals");
+ throw new IOException(status+" failed on all the journals");
+ }
+ }
+
+ /**
+ * An implementation of EditLogOutputStream that applies a requested method on
+ * all the journals that are currently active.
+ */
+ private class JournalSetOutputStream extends EditLogOutputStream {
+
+ JournalSetOutputStream() throws IOException {
+ super();
+ }
+
+ @Override
+ void write(final FSEditLogOp op)
+ throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ if (jas.isActive()) {
+ jas.getCurrentStream().write(op);
+ }
+ }
+ }, "write op");
+ }
+
+ @Override
+ void writeRaw(final byte[] data, final int offset, final int length)
+ throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ if (jas.isActive()) {
+ jas.getCurrentStream().writeRaw(data, offset, length);
+ }
+ }
+ }, "write bytes");
+ }
+
+ @Override
+ void create() throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ if (jas.isActive()) {
+ jas.getCurrentStream().create();
+ }
+ }
+ }, "create");
+ }
+
+ @Override
+ public void close() throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ jas.close();
+ }
+ }, "close");
+ }
+
+ @Override
+ public void abort() throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ jas.abort();
+ }
+ }, "abort");
+ }
+
+ @Override
+ void setReadyToFlush() throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ if (jas.isActive()) {
+ jas.getCurrentStream().setReadyToFlush();
+ }
+ }
+ }, "setReadyToFlush");
+ }
+
+ @Override
+ protected void flushAndSync() throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ if (jas.isActive()) {
+ jas.getCurrentStream().flushAndSync();
+ }
+ }
+ }, "flushAndSync");
+ }
+
+ @Override
+ public void flush() throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ if (jas.isActive()) {
+ jas.getCurrentStream().flush();
+ }
+ }
+ }, "flush");
+ }
+
+ @Override
+ public boolean shouldForceSync() {
+ for (JournalAndStream js : journals) {
+ if (js.isActive() && js.getCurrentStream().shouldForceSync()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected long getNumSync() {
+ for (JournalAndStream jas : journals) {
+ if (jas.isActive()) {
+ return jas.getCurrentStream().getNumSync();
+ }
+ }
+ return 0;
+ }
+ }
+
+ @Override
+ public void setOutputBufferCapacity(final int size) {
+ try {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ jas.getManager().setOutputBufferCapacity(size);
+ }
+ }, "setOutputBufferCapacity");
+ } catch (IOException e) {
+ LOG.error("Error in setting outputbuffer capacity");
+ }
+ }
+
+ @VisibleForTesting
+ List getAllJournalStreams() {
+ return journals;
+ }
+
+ List getJournalManagers() {
+ List jList = new ArrayList();
+ for (JournalAndStream j : journals) {
+ jList.add(j.getManager());
+ }
+ return jList;
+ }
+
+ void add(JournalManager j) {
+ journals.add(new JournalAndStream(j));
+ }
+
+ void remove(JournalManager j) {
+ JournalAndStream jasToRemove = null;
+ for (JournalAndStream jas: journals) {
+ if (jas.getManager().equals(j)) {
+ jasToRemove = jas;
+ break;
+ }
+ }
+ if (jasToRemove != null) {
+ jasToRemove.abort();
+ journals.remove(jasToRemove);
+ }
+ }
+
+ @Override
+ public void purgeLogsOlderThan(final long minTxIdToKeep) throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ jas.getManager().purgeLogsOlderThan(minTxIdToKeep);
+ }
+ }, "purgeLogsOlderThan " + minTxIdToKeep);
+ }
+
+ @Override
+ public void recoverUnfinalizedSegments() throws IOException {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ jas.getManager().recoverUnfinalizedSegments();
+ }
+ }, "recoverUnfinalizedSegments");
+ }
+
+ /**
+ * Return a manifest of what finalized edit logs are available. All available
+ * edit logs are returned starting from the transaction id passed.
+ *
+ * @param fromTxId Starting transaction id to read the logs.
+ * @return RemoteEditLogManifest object.
+ */
+ public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
+ // Collect RemoteEditLogs available from each FileJournalManager
+ List allLogs = Lists.newArrayList();
+ for (JournalAndStream j : journals) {
+ if (j.getManager() instanceof FileJournalManager) {
+ FileJournalManager fjm = (FileJournalManager)j.getManager();
+ try {
+ allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
+ } catch (Throwable t) {
+ LOG.warn("Cannot list edit logs in " + fjm, t);
+ }
+ }
+ }
+
+ // Group logs by their starting txid
+ ImmutableListMultimap logsByStartTxId =
+ Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
+ long curStartTxId = fromTxId;
+
+ List logs = Lists.newArrayList();
+ while (true) {
+ ImmutableList logGroup = logsByStartTxId.get(curStartTxId);
+ if (logGroup.isEmpty()) {
+ // we have a gap in logs - for example because we recovered some old
+ // storage directory with ancient logs. Clear out any logs we've
+ // accumulated so far, and then skip to the next segment of logs
+ // after the gap.
+ SortedSet startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
+ startTxIds = startTxIds.tailSet(curStartTxId);
+ if (startTxIds.isEmpty()) {
+ break;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found gap in logs at " + curStartTxId + ": " +
+ "not returning previous logs in manifest.");
+ }
+ logs.clear();
+ curStartTxId = startTxIds.first();
+ continue;
+ }
+ }
+
+ // Find the one that extends the farthest forward
+ RemoteEditLog bestLog = Collections.max(logGroup);
+ logs.add(bestLog);
+ // And then start looking from after that point
+ curStartTxId = bestLog.getEndTxId() + 1;
+ }
+ RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated manifest for logs since " + fromTxId + ":"
+ + ret);
+ }
+ return ret;
+ }
+
+ /**
+ * Add sync times to the buffer.
+ */
+ String getSyncTimes() {
+ StringBuilder buf = new StringBuilder();
+ for (JournalAndStream jas : journals) {
+ if (jas.isActive()) {
+ buf.append(jas.getCurrentStream().getTotalSyncTime());
+ buf.append(" ");
+ }
+ }
+ return buf.toString();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
index 81133242a0..8711898692 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@@ -33,6 +34,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -73,7 +75,7 @@ public void shutDownMiniCluster() throws IOException {
public void testSingleFailedEditsDirOnFlush() throws IOException {
assertTrue(doAnEdit());
// Invalidate one edits journal.
- invalidateEditsDirAtIndex(0, true);
+ invalidateEditsDirAtIndex(0, true, false);
// Make sure runtime.exit(...) hasn't been called at all yet.
assertExitInvocations(0);
assertTrue(doAnEdit());
@@ -86,8 +88,22 @@ public void testSingleFailedEditsDirOnFlush() throws IOException {
public void testAllEditsDirsFailOnFlush() throws IOException {
assertTrue(doAnEdit());
// Invalidate both edits journals.
- invalidateEditsDirAtIndex(0, true);
- invalidateEditsDirAtIndex(1, true);
+ invalidateEditsDirAtIndex(0, true, false);
+ invalidateEditsDirAtIndex(1, true, false);
+ // Make sure runtime.exit(...) hasn't been called at all yet.
+ assertExitInvocations(0);
+ assertTrue(doAnEdit());
+ // The previous edit could not be synced to any persistent storage, should
+ // have halted the NN.
+ assertExitInvocations(1);
+ }
+
+ @Test
+ public void testAllEditsDirFailOnWrite() throws IOException {
+ assertTrue(doAnEdit());
+ // Invalidate both edits journals.
+ invalidateEditsDirAtIndex(0, true, true);
+ invalidateEditsDirAtIndex(1, true, true);
// Make sure runtime.exit(...) hasn't been called at all yet.
assertExitInvocations(0);
assertTrue(doAnEdit());
@@ -100,7 +116,7 @@ public void testAllEditsDirsFailOnFlush() throws IOException {
public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException {
assertTrue(doAnEdit());
// Invalidate one edits journal.
- invalidateEditsDirAtIndex(0, false);
+ invalidateEditsDirAtIndex(0, false, false);
// Make sure runtime.exit(...) hasn't been called at all yet.
assertExitInvocations(0);
assertTrue(doAnEdit());
@@ -117,16 +133,18 @@ public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException {
* @return the original EditLogOutputStream
of the journal.
*/
private EditLogOutputStream invalidateEditsDirAtIndex(int index,
- boolean failOnFlush) throws IOException {
+ boolean failOnFlush, boolean failOnWrite) throws IOException {
FSImage fsimage = cluster.getNamesystem().getFSImage();
FSEditLog editLog = fsimage.getEditLog();
-
- FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+ JournalAndStream jas = editLog.getJournals().get(index);
EditLogFileOutputStream elos =
(EditLogFileOutputStream) jas.getCurrentStream();
EditLogFileOutputStream spyElos = spy(elos);
-
+ if (failOnWrite) {
+ doThrow(new IOException("fail on write()")).when(spyElos).write(
+ (FSEditLogOp) any());
+ }
if (failOnFlush) {
doThrow(new IOException("fail on flush()")).when(spyElos).flush();
} else {
@@ -151,7 +169,7 @@ private void restoreEditsDirAtIndex(int index, EditLogOutputStream elos) {
FSImage fsimage = cluster.getNamesystem().getFSImage();
FSEditLog editLog = fsimage.getEditLog();
- FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+ JournalAndStream jas = editLog.getJournals().get(index);
jas.setCurrentStreamForTests(elos);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index bc5aa162fb..d3d64594ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.log4j.Level;
@@ -356,7 +357,7 @@ public void testSaveImageWhileSyncInProgress() throws Exception {
FSImage fsimage = namesystem.getFSImage();
FSEditLog editLog = fsimage.getEditLog();
- FSEditLog.JournalAndStream jas = editLog.getJournals().get(0);
+ JournalAndStream jas = editLog.getJournals().get(0);
EditLogFileOutputStream spyElos =
spy((EditLogFileOutputStream)jas.getCurrentStream());
jas.setCurrentStreamForTests(spyElos);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
index 11152883c8..f3a4638f10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
@@ -28,7 +28,6 @@
import java.util.Iterator;
import java.util.Set;
-import static org.mockito.Matchers.anyByte;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@@ -45,7 +44,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName;
@@ -123,7 +122,7 @@ public void invalidateStorage(FSImage fi, Set filesToInvalidate) throws IO
// simulate an error
fi.getStorage().reportErrorsOnDirectories(al);
- for (FSEditLog.JournalAndStream j : fi.getEditLog().getJournals()) {
+ for (JournalAndStream j : fi.getEditLog().getJournals()) {
if (j.getManager() instanceof FileJournalManager) {
FileJournalManager fm = (FileJournalManager)j.getManager();
if (fm.getStorageDirectory().getRoot().equals(path2)