HDFS-3273. Refactor BackupImage and FSEditLog, and rename JournalListener.rollLogs(..) to startLogSegment(..).

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1326016 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-04-13 23:41:01 +00:00
parent f694ecdd93
commit 841fdc5628
8 changed files with 93 additions and 54 deletions

View File

@ -62,6 +62,9 @@ Trunk (unreleased changes)
HDFS-3178. Add states and state handler for journal synchronization in HDFS-3178. Add states and state handler for journal synchronization in
JournalService. (szetszwo) JournalService. (szetszwo)
HDFS-3273. Refactor BackupImage and FSEditLog, and rename
JournalListener.rollLogs(..) to startLogSegment(..). (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.

View File

@ -60,5 +60,5 @@ public void journal(JournalService service, long firstTxnId, int numTxns,
* Any IOException thrown from the listener is thrown back in * Any IOException thrown from the listener is thrown back in
* {@link JournalProtocol#startLogSegment} * {@link JournalProtocol#startLogSegment}
*/ */
public void rollLogs(JournalService service, long txid) throws IOException; public void startLogSegment(JournalService service, long txid) throws IOException;
} }

View File

@ -256,7 +256,7 @@ public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
} }
stateHandler.isStartLogSegmentAllowed(); stateHandler.isStartLogSegmentAllowed();
verify(epoch, journalInfo); verify(epoch, journalInfo);
listener.rollLogs(this, txid); listener.startLogSegment(this, txid);
stateHandler.startLogSegment(); stateHandler.startLogSegment();
} }

View File

@ -21,6 +21,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -183,21 +184,9 @@ synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOExc
} }
// write to BN's local edit log. // write to BN's local edit log.
logEditsLocally(firstTxId, numTxns, data); editLog.journal(firstTxId, numTxns, data);
} }
/**
* Write the batch of edits to the local copy of the edit logs.
*/
private void logEditsLocally(long firstTxId, int numTxns, byte[] data) {
long expectedTxId = editLog.getLastWrittenTxId() + 1;
Preconditions.checkState(firstTxId == expectedTxId,
"received txid batch starting at %s but expected txn %s",
firstTxId, expectedTxId);
editLog.setNextTxId(firstTxId + numTxns - 1);
editLog.logEdit(data.length, data);
editLog.logSync();
}
/** /**
* Apply the batch of edits to the local namespace. * Apply the batch of edits to the local namespace.
@ -342,28 +331,9 @@ private synchronized void setState(BNState newState) {
* This causes the BN to also start the new edit log in its local * This causes the BN to also start the new edit log in its local
* directories. * directories.
*/ */
synchronized void namenodeStartedLogSegment(long txid) synchronized void namenodeStartedLogSegment(long txid) throws IOException {
throws IOException { editLog.startLogSegment(txid, true);
LOG.info("NameNode started a new log segment at txid " + txid);
if (editLog.isSegmentOpen()) {
if (editLog.getLastWrittenTxId() == txid - 1) {
// We are in sync with the NN, so end and finalize the current segment
editLog.endCurrentLogSegment(false);
} else {
// We appear to have missed some transactions -- the NN probably
// lost contact with us temporarily. So, mark the current segment
// as aborted.
LOG.warn("NN started new log segment at txid " + txid +
", but BN had only written up to txid " +
editLog.getLastWrittenTxId() +
"in the log segment starting at " +
editLog.getCurSegmentTxId() + ". Aborting this " +
"log segment.");
editLog.abortCurrentLogSegment();
}
}
editLog.setNextTxId(txid);
editLog.startLogSegment(txid, false);
if (bnState == BNState.DROP_UNTIL_NEXT_ROLL) { if (bnState == BNState.DROP_UNTIL_NEXT_ROLL) {
setState(BNState.JOURNAL_ONLY); setState(BNState.JOURNAL_ONLY);
} }

View File

@ -260,7 +260,7 @@ private void verifyJournalRequest(JournalInfo journalInfo)
} }
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
// BackupNodeProtocol implementation for backup node. // JournalProtocol implementation for backup node.
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
@Override @Override
public void startLogSegment(JournalInfo journalInfo, long epoch, public void startLogSegment(JournalInfo journalInfo, long epoch,

View File

@ -18,18 +18,20 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.net.URI;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.lang.reflect.Constructor;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -37,14 +39,34 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogSegmentOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.conf.Configuration;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -269,7 +291,7 @@ synchronized void openForWrite() throws IOException {
IOUtils.closeStream(s); IOUtils.closeStream(s);
} }
startLogSegment(segmentTxId, true); startLogSegmentAndWriteHeaderTxn(segmentTxId);
assert state == State.IN_SEGMENT : "Bad state: " + state; assert state == State.IN_SEGMENT : "Bad state: " + state;
} }
@ -864,18 +886,48 @@ synchronized long rollEditLog() throws IOException {
endCurrentLogSegment(true); endCurrentLogSegment(true);
long nextTxId = getLastWrittenTxId() + 1; long nextTxId = getLastWrittenTxId() + 1;
startLogSegment(nextTxId, true); startLogSegmentAndWriteHeaderTxn(nextTxId);
assert curSegmentTxId == nextTxId; assert curSegmentTxId == nextTxId;
return nextTxId; return nextTxId;
} }
/**
* Remote namenode just has started a log segment, start log segment locally.
*/
public synchronized void startLogSegment(long txid,
boolean abortCurrentLogSegment) throws IOException {
LOG.info("Namenode started a new log segment at txid " + txid);
if (isSegmentOpen()) {
if (getLastWrittenTxId() == txid - 1) {
//In sync with the NN, so end and finalize the current segment`
endCurrentLogSegment(false);
} else {
//Missed some transactions: probably lost contact with NN temporarily.
final String mess = "Cannot start a new log segment at txid " + txid
+ " since only up to txid " + getLastWrittenTxId()
+ " have been written in the log segment starting at "
+ getCurSegmentTxId() + ".";
if (abortCurrentLogSegment) {
//Mark the current segment as aborted.
LOG.warn(mess);
abortCurrentLogSegment();
} else {
throw new IOException(mess);
}
}
}
setNextTxId(txid);
startLogSegment(txid);
}
/** /**
* Start writing to the log segment with the given txid. * Start writing to the log segment with the given txid.
* Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state. * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state.
*/ */
synchronized void startLogSegment(final long segmentTxId, private void startLogSegment(final long segmentTxId) throws IOException {
boolean writeHeaderTxn) throws IOException { assert Thread.holdsLock(this);
LOG.info("Starting log segment at " + segmentTxId); LOG.info("Starting log segment at " + segmentTxId);
Preconditions.checkArgument(segmentTxId > 0, Preconditions.checkArgument(segmentTxId > 0,
"Bad txid: %s", segmentTxId); "Bad txid: %s", segmentTxId);
@ -903,13 +955,16 @@ synchronized void startLogSegment(final long segmentTxId,
curSegmentTxId = segmentTxId; curSegmentTxId = segmentTxId;
state = State.IN_SEGMENT; state = State.IN_SEGMENT;
}
synchronized void startLogSegmentAndWriteHeaderTxn(final long segmentTxId
) throws IOException {
startLogSegment(segmentTxId);
if (writeHeaderTxn) {
logEdit(LogSegmentOp.getInstance(cache.get(), logEdit(LogSegmentOp.getInstance(cache.get(),
FSEditLogOpCodes.OP_START_LOG_SEGMENT)); FSEditLogOpCodes.OP_START_LOG_SEGMENT));
logSync(); logSync();
} }
}
/** /**
* Finalize the current log segment. * Finalize the current log segment.
@ -1057,6 +1112,17 @@ private synchronized BackupJournalManager findBackupJournal(
return null; return null;
} }
/** Write the batch of edits to edit log. */
public synchronized void journal(long firstTxId, int numTxns, byte[] data) {
final long expectedTxId = getLastWrittenTxId() + 1;
Preconditions.checkState(firstTxId == expectedTxId,
"received txid batch starting at %s but expected txid %s",
firstTxId, expectedTxId);
setNextTxId(firstTxId + numTxns - 1);
logEdit(data.length, data);
logSync();
}
/** /**
* Write an operation to the edit log. Do not sync to persistent * Write an operation to the edit log. Do not sync to persistent
* store yet. * store yet.

View File

@ -823,7 +823,7 @@ public synchronized void saveNamespace(FSNamesystem source) throws IOException {
storage.writeAll(); storage.writeAll();
} finally { } finally {
if (editLogWasOpen) { if (editLogWasOpen) {
editLog.startLogSegment(imageTxId + 1, true); editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1);
// Take this opportunity to note the current transaction. // Take this opportunity to note the current transaction.
// Even if the namespace save was cancelled, this marker // Even if the namespace save was cancelled, this marker
// is only used to determine what transaction ID is required // is only used to determine what transaction ID is required

View File

@ -43,7 +43,7 @@ public class TestJournalService {
private Configuration conf = new HdfsConfiguration(); private Configuration conf = new HdfsConfiguration();
/** /**
* Test calls backs {@link JournalListener#rollLogs(JournalService, long)} and * Test calls backs {@link JournalListener#startLogSegment(JournalService, long)} and
* {@link JournalListener#journal(JournalService, long, int, byte[])} are * {@link JournalListener#journal(JournalService, long, int, byte[])} are
* called. * called.
*/ */
@ -85,7 +85,7 @@ private JournalService startJournalService(JournalListener listener)
*/ */
private void verifyRollLogsCallback(JournalService s, JournalListener l) private void verifyRollLogsCallback(JournalService s, JournalListener l)
throws IOException { throws IOException {
Mockito.verify(l, Mockito.times(1)).rollLogs(Mockito.eq(s), Mockito.anyLong()); Mockito.verify(l, Mockito.times(1)).startLogSegment(Mockito.eq(s), Mockito.anyLong());
} }
/** /**