HDFS-5074. Allow starting up from an fsimage checkpoint in the middle of a segment. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1550016 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
87734ec939
commit
512a18a8d9
@ -642,6 +642,9 @@ Release 2.4.0 - UNRELEASED
|
|||||||
HDFS-5533. Symlink delete/create should be treated as DELETE/CREATE in snapshot diff
|
HDFS-5533. Symlink delete/create should be treated as DELETE/CREATE in snapshot diff
|
||||||
report. (Binglin Chang via jing9)
|
report. (Binglin Chang via jing9)
|
||||||
|
|
||||||
|
HDFS-5074. Allow starting up from an fsimage checkpoint in the middle of a
|
||||||
|
segment. (Todd Lipcon via atm)
|
||||||
|
|
||||||
Release 2.3.0 - UNRELEASED
|
Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -500,14 +500,9 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
||||||
long fromTxId, boolean inProgressOk) throws IOException {
|
|
||||||
selectInputStreams(streams, fromTxId, inProgressOk, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk, boolean forReading)
|
long fromTxId, boolean inProgressOk)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
|
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
|
||||||
inProgressOk);
|
inProgressOk);
|
||||||
|
@ -109,7 +109,7 @@ public ListenableFuture<Void> finalizeLogSegment(
|
|||||||
* Fetch the list of edit logs available on the remote node.
|
* Fetch the list of edit logs available on the remote node.
|
||||||
*/
|
*/
|
||||||
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
||||||
long fromTxnId, boolean forReading, boolean inProgressOk);
|
long fromTxnId, boolean inProgressOk);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepare recovery. See the HDFS-3077 design document for details.
|
* Prepare recovery. See the HDFS-3077 design document for details.
|
||||||
|
@ -261,13 +261,13 @@ public QuorumCall<AsyncLogger, Void> sendEdits(
|
|||||||
}
|
}
|
||||||
|
|
||||||
public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
|
public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
|
||||||
long fromTxnId, boolean forReading, boolean inProgressOk) {
|
long fromTxnId, boolean inProgressOk) {
|
||||||
Map<AsyncLogger,
|
Map<AsyncLogger,
|
||||||
ListenableFuture<RemoteEditLogManifest>> calls
|
ListenableFuture<RemoteEditLogManifest>> calls
|
||||||
= Maps.newHashMap();
|
= Maps.newHashMap();
|
||||||
for (AsyncLogger logger : loggers) {
|
for (AsyncLogger logger : loggers) {
|
||||||
ListenableFuture<RemoteEditLogManifest> future =
|
ListenableFuture<RemoteEditLogManifest> future =
|
||||||
logger.getEditLogManifest(fromTxnId, forReading, inProgressOk);
|
logger.getEditLogManifest(fromTxnId, inProgressOk);
|
||||||
calls.put(logger, future);
|
calls.put(logger, future);
|
||||||
}
|
}
|
||||||
return QuorumCall.create(calls);
|
return QuorumCall.create(calls);
|
||||||
|
@ -181,6 +181,7 @@ public synchronized void setCommittedTxId(long txid) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
QuorumJournalManager.LOG.info("Closing", new Exception());
|
||||||
// No more tasks may be submitted after this point.
|
// No more tasks may be submitted after this point.
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
if (proxy != null) {
|
if (proxy != null) {
|
||||||
@ -520,13 +521,12 @@ public Void call() throws Exception {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
||||||
final long fromTxnId, final boolean forReading,
|
final long fromTxnId, final boolean inProgressOk) {
|
||||||
final boolean inProgressOk) {
|
|
||||||
return executor.submit(new Callable<RemoteEditLogManifest>() {
|
return executor.submit(new Callable<RemoteEditLogManifest>() {
|
||||||
@Override
|
@Override
|
||||||
public RemoteEditLogManifest call() throws IOException {
|
public RemoteEditLogManifest call() throws IOException {
|
||||||
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
|
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
|
||||||
journalId, fromTxnId, forReading, inProgressOk);
|
journalId, fromTxnId, inProgressOk);
|
||||||
// Update the http port, since we need this to build URLs to any of the
|
// Update the http port, since we need this to build URLs to any of the
|
||||||
// returned logs.
|
// returned logs.
|
||||||
constructHttpServerURI(ret);
|
constructHttpServerURI(ret);
|
||||||
|
@ -450,17 +450,12 @@ public void close() throws IOException {
|
|||||||
loggers.close();
|
loggers.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
||||||
long fromTxnId, boolean inProgressOk) throws IOException {
|
|
||||||
selectInputStreams(streams, fromTxnId, inProgressOk, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException {
|
long fromTxnId, boolean inProgressOk) throws IOException {
|
||||||
|
|
||||||
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
|
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
|
||||||
loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk);
|
loggers.getEditLogManifest(fromTxnId, inProgressOk);
|
||||||
Map<AsyncLogger, RemoteEditLogManifest> resps =
|
Map<AsyncLogger, RemoteEditLogManifest> resps =
|
||||||
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
|
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
|
||||||
"selectInputStreams");
|
"selectInputStreams");
|
||||||
|
@ -123,14 +123,12 @@ public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep)
|
|||||||
/**
|
/**
|
||||||
* @param jid the journal from which to enumerate edits
|
* @param jid the journal from which to enumerate edits
|
||||||
* @param sinceTxId the first transaction which the client cares about
|
* @param sinceTxId the first transaction which the client cares about
|
||||||
* @param forReading whether or not the caller intends to read from the edit
|
|
||||||
* logs
|
|
||||||
* @param inProgressOk whether or not to check the in-progress edit log
|
* @param inProgressOk whether or not to check the in-progress edit log
|
||||||
* segment
|
* segment
|
||||||
* @return a list of edit log segments since the given transaction ID.
|
* @return a list of edit log segments since the given transaction ID.
|
||||||
*/
|
*/
|
||||||
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||||
long sinceTxId, boolean forReading, boolean inProgressOk)
|
long sinceTxId, boolean inProgressOk)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -203,7 +203,6 @@ public GetEditLogManifestResponseProto getEditLogManifest(
|
|||||||
return impl.getEditLogManifest(
|
return impl.getEditLogManifest(
|
||||||
request.getJid().getIdentifier(),
|
request.getJid().getIdentifier(),
|
||||||
request.getSinceTxId(),
|
request.getSinceTxId(),
|
||||||
request.getForReading(),
|
|
||||||
request.getInProgressOk());
|
request.getInProgressOk());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
|
@ -228,14 +228,13 @@ public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||||
long sinceTxId, boolean forReading, boolean inProgressOk)
|
long sinceTxId, boolean inProgressOk)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
|
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
|
||||||
GetEditLogManifestRequestProto.newBuilder()
|
GetEditLogManifestRequestProto.newBuilder()
|
||||||
.setJid(convertJournalId(jid))
|
.setJid(convertJournalId(jid))
|
||||||
.setSinceTxId(sinceTxId)
|
.setSinceTxId(sinceTxId)
|
||||||
.setForReading(forReading)
|
|
||||||
.setInProgressOk(inProgressOk)
|
.setInProgressOk(inProgressOk)
|
||||||
.build());
|
.build());
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
|
@ -630,15 +630,12 @@ private void purgePaxosDecision(long segmentTxId) throws IOException {
|
|||||||
* @see QJournalProtocol#getEditLogManifest(String, long)
|
* @see QJournalProtocol#getEditLogManifest(String, long)
|
||||||
*/
|
*/
|
||||||
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
|
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
|
||||||
boolean forReading, boolean inProgressOk) throws IOException {
|
boolean inProgressOk) throws IOException {
|
||||||
// No need to checkRequest() here - anyone may ask for the list
|
// No need to checkRequest() here - anyone may ask for the list
|
||||||
// of segments.
|
// of segments.
|
||||||
checkFormatted();
|
checkFormatted();
|
||||||
|
|
||||||
// if this is for reading, ignore the in-progress editlog segment
|
List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, inProgressOk);
|
||||||
inProgressOk = forReading ? false : inProgressOk;
|
|
||||||
List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, forReading,
|
|
||||||
inProgressOk);
|
|
||||||
|
|
||||||
if (inProgressOk) {
|
if (inProgressOk) {
|
||||||
RemoteEditLog log = null;
|
RemoteEditLog log = null;
|
||||||
|
@ -178,11 +178,11 @@ public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
|
|||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Override
|
@Override
|
||||||
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||||
long sinceTxId, boolean forReading, boolean inProgressOk)
|
long sinceTxId, boolean inProgressOk)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
|
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
|
||||||
.getEditLogManifest(sinceTxId, forReading, inProgressOk);
|
.getEditLogManifest(sinceTxId, inProgressOk);
|
||||||
|
|
||||||
return GetEditLogManifestResponseProto.newBuilder()
|
return GetEditLogManifestResponseProto.newBuilder()
|
||||||
.setManifest(PBHelper.convert(manifest))
|
.setManifest(PBHelper.convert(manifest))
|
||||||
|
@ -77,7 +77,7 @@ public void purgeLogsOlderThan(long minTxIdToKeep)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxnId, boolean inProgressOk, boolean forReading) {
|
long fromTxnId, boolean inProgressOk) {
|
||||||
// This JournalManager is never used for input. Therefore it cannot
|
// This JournalManager is never used for input. Therefore it cannot
|
||||||
// return any transactions
|
// return any transactions
|
||||||
}
|
}
|
||||||
|
@ -286,7 +286,7 @@ synchronized void openForWrite() throws IOException {
|
|||||||
// Safety check: we should never start a segment if there are
|
// Safety check: we should never start a segment if there are
|
||||||
// newer txids readable.
|
// newer txids readable.
|
||||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||||
journalSet.selectInputStreams(streams, segmentTxId, true, true);
|
journalSet.selectInputStreams(streams, segmentTxId, true);
|
||||||
if (!streams.isEmpty()) {
|
if (!streams.isEmpty()) {
|
||||||
String error = String.format("Cannot start writing at txid %s " +
|
String error = String.format("Cannot start writing at txid %s " +
|
||||||
"when there is a stream available for read: %s",
|
"when there is a stream available for read: %s",
|
||||||
@ -1037,7 +1037,7 @@ void setMetricsForTests(NameNodeMetrics metrics) {
|
|||||||
*/
|
*/
|
||||||
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
|
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return journalSet.getEditLogManifest(fromTxId, true);
|
return journalSet.getEditLogManifest(fromTxId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1332,8 +1332,8 @@ synchronized void recoverUnclosedStreams() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk, boolean forReading) {
|
long fromTxId, boolean inProgressOk) throws IOException {
|
||||||
journalSet.selectInputStreams(streams, fromTxId, inProgressOk, forReading);
|
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<EditLogInputStream> selectInputStreams(
|
public Collection<EditLogInputStream> selectInputStreams(
|
||||||
@ -1341,27 +1341,18 @@ public Collection<EditLogInputStream> selectInputStreams(
|
|||||||
return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
|
return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Select a list of input streams to load */
|
|
||||||
public Collection<EditLogInputStream> selectInputStreams(
|
|
||||||
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
|
||||||
boolean inProgressOk) throws IOException {
|
|
||||||
return selectInputStreams(fromTxId, toAtLeastTxId, recovery, inProgressOk,
|
|
||||||
true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Select a list of input streams.
|
* Select a list of input streams.
|
||||||
*
|
*
|
||||||
* @param fromTxId first transaction in the selected streams
|
* @param fromTxId first transaction in the selected streams
|
||||||
* @param toAtLeast the selected streams must contain this transaction
|
* @param toAtLeast the selected streams must contain this transaction
|
||||||
* @param inProgessOk set to true if in-progress streams are OK
|
* @param inProgessOk set to true if in-progress streams are OK
|
||||||
* @param forReading whether or not to use the streams to load the edit log
|
|
||||||
*/
|
*/
|
||||||
public synchronized Collection<EditLogInputStream> selectInputStreams(
|
public synchronized Collection<EditLogInputStream> selectInputStreams(
|
||||||
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
||||||
boolean inProgressOk, boolean forReading) throws IOException {
|
boolean inProgressOk) throws IOException {
|
||||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||||
selectInputStreams(streams, fromTxId, inProgressOk, forReading);
|
selectInputStreams(streams, fromTxId, inProgressOk);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
|
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
|
||||||
|
@ -167,19 +167,13 @@ public void purgeLogsOlderThan(long minTxIdToKeep)
|
|||||||
/**
|
/**
|
||||||
* Find all editlog segments starting at or above the given txid.
|
* Find all editlog segments starting at or above the given txid.
|
||||||
* @param fromTxId the txnid which to start looking
|
* @param fromTxId the txnid which to start looking
|
||||||
* @param forReading whether or not the caller intends to read from the edit
|
|
||||||
* logs
|
|
||||||
* @param inProgressOk whether or not to include the in-progress edit log
|
* @param inProgressOk whether or not to include the in-progress edit log
|
||||||
* segment
|
* segment
|
||||||
* @return a list of remote edit logs
|
* @return a list of remote edit logs
|
||||||
* @throws IOException if edit logs cannot be listed.
|
* @throws IOException if edit logs cannot be listed.
|
||||||
*/
|
*/
|
||||||
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
|
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
|
||||||
boolean forReading, boolean inProgressOk) throws IOException {
|
boolean inProgressOk) throws IOException {
|
||||||
// make sure not reading in-progress edit log, i.e., if forReading is true,
|
|
||||||
// we should ignore the in-progress edit log.
|
|
||||||
Preconditions.checkArgument(!(forReading && inProgressOk));
|
|
||||||
|
|
||||||
File currentDir = sd.getCurrentDir();
|
File currentDir = sd.getCurrentDir();
|
||||||
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
|
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
|
||||||
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
|
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
|
||||||
@ -192,16 +186,11 @@ public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
|
|||||||
if (elf.getFirstTxId() >= firstTxId) {
|
if (elf.getFirstTxId() >= firstTxId) {
|
||||||
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
||||||
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
|
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
|
||||||
// If the firstTxId is in the middle of an edit log segment
|
// If the firstTxId is in the middle of an edit log segment. Return this
|
||||||
if (forReading) {
|
// anyway and let the caller figure out whether it wants to use it.
|
||||||
// Note that this behavior is different from getLogFiles below.
|
|
||||||
throw new IllegalStateException("Asked for firstTxId " + firstTxId
|
|
||||||
+ " which is in the middle of file " + elf.file);
|
|
||||||
} else {
|
|
||||||
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Collections.sort(ret);
|
Collections.sort(ret);
|
||||||
|
|
||||||
@ -260,7 +249,7 @@ static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
|
|||||||
@Override
|
@Override
|
||||||
synchronized public void selectInputStreams(
|
synchronized public void selectInputStreams(
|
||||||
Collection<EditLogInputStream> streams, long fromTxId,
|
Collection<EditLogInputStream> streams, long fromTxId,
|
||||||
boolean inProgressOk, boolean forReading) throws IOException {
|
boolean inProgressOk) throws IOException {
|
||||||
List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
|
List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
|
||||||
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
|
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
|
||||||
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
|
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
|
||||||
|
@ -233,12 +233,10 @@ public void apply(JournalAndStream jas) throws IOException {
|
|||||||
* may not be sorted-- this is up to the caller.
|
* may not be sorted-- this is up to the caller.
|
||||||
* @param fromTxId The transaction ID to start looking for streams at
|
* @param fromTxId The transaction ID to start looking for streams at
|
||||||
* @param inProgressOk Should we consider unfinalized streams?
|
* @param inProgressOk Should we consider unfinalized streams?
|
||||||
* @param forReading Whether or not the caller intends to read from
|
|
||||||
* the returned streams.
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk, boolean forReading) {
|
long fromTxId, boolean inProgressOk) {
|
||||||
final PriorityQueue<EditLogInputStream> allStreams =
|
final PriorityQueue<EditLogInputStream> allStreams =
|
||||||
new PriorityQueue<EditLogInputStream>(64,
|
new PriorityQueue<EditLogInputStream>(64,
|
||||||
EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||||
@ -248,8 +246,7 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk,
|
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
|
||||||
forReading);
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Unable to determine input streams from " + jas.getManager() +
|
LOG.warn("Unable to determine input streams from " + jas.getManager() +
|
||||||
". Skipping.", ioe);
|
". Skipping.", ioe);
|
||||||
@ -582,20 +579,20 @@ public void apply(JournalAndStream jas) throws IOException {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a manifest of what finalized edit logs are available. All available
|
* Return a manifest of what finalized edit logs are available. All available
|
||||||
* edit logs are returned starting from the transaction id passed.
|
* edit logs are returned starting from the transaction id passed. If
|
||||||
|
* 'fromTxId' falls in the middle of a log, that log is returned as well.
|
||||||
*
|
*
|
||||||
* @param fromTxId Starting transaction id to read the logs.
|
* @param fromTxId Starting transaction id to read the logs.
|
||||||
* @return RemoteEditLogManifest object.
|
* @return RemoteEditLogManifest object.
|
||||||
*/
|
*/
|
||||||
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId,
|
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
|
||||||
boolean forReading) {
|
|
||||||
// Collect RemoteEditLogs available from each FileJournalManager
|
// Collect RemoteEditLogs available from each FileJournalManager
|
||||||
List<RemoteEditLog> allLogs = Lists.newArrayList();
|
List<RemoteEditLog> allLogs = Lists.newArrayList();
|
||||||
for (JournalAndStream j : journals) {
|
for (JournalAndStream j : journals) {
|
||||||
if (j.getManager() instanceof FileJournalManager) {
|
if (j.getManager() instanceof FileJournalManager) {
|
||||||
FileJournalManager fjm = (FileJournalManager)j.getManager();
|
FileJournalManager fjm = (FileJournalManager)j.getManager();
|
||||||
try {
|
try {
|
||||||
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, forReading, false));
|
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, false));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn("Cannot list edit logs in " + fjm, t);
|
LOG.warn("Cannot list edit logs in " + fjm, t);
|
||||||
}
|
}
|
||||||
|
@ -42,13 +42,11 @@ interface LogsPurgeable {
|
|||||||
*
|
*
|
||||||
* @param fromTxId the first transaction id we want to read
|
* @param fromTxId the first transaction id we want to read
|
||||||
* @param inProgressOk whether or not in-progress streams should be returned
|
* @param inProgressOk whether or not in-progress streams should be returned
|
||||||
* @param forReading whether or not the caller intends to read from the edit logs
|
|
||||||
*
|
|
||||||
* @return a list of streams
|
* @return a list of streams
|
||||||
* @throws IOException if the underlying storage has an error or is otherwise
|
* @throws IOException if the underlying storage has an error or is otherwise
|
||||||
* inaccessible
|
* inaccessible
|
||||||
*/
|
*/
|
||||||
void selectInputStreams(Collection<EditLogInputStream> streams,
|
void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk, boolean forReading) throws IOException;
|
long fromTxId, boolean inProgressOk) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -108,7 +108,7 @@ public void purgeOldStorage() throws IOException {
|
|||||||
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
|
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
|
||||||
|
|
||||||
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
|
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
|
||||||
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
|
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
|
||||||
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
|
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(EditLogInputStream a, EditLogInputStream b) {
|
public int compare(EditLogInputStream a, EditLogInputStream b) {
|
||||||
|
@ -827,7 +827,7 @@ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk, boolean forReading) {
|
long fromTxId, boolean inProgressOk) {
|
||||||
Iterator<StorageDirectory> iter = storage.dirIterator();
|
Iterator<StorageDirectory> iter = storage.dirIterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
StorageDirectory dir = iter.next();
|
StorageDirectory dir = iter.next();
|
||||||
|
@ -228,7 +228,7 @@ private boolean checkLogsAvailableForRead(FSImage image, long imageTxId,
|
|||||||
try {
|
try {
|
||||||
Collection<EditLogInputStream> streams =
|
Collection<EditLogInputStream> streams =
|
||||||
image.getEditLog().selectInputStreams(
|
image.getEditLog().selectInputStreams(
|
||||||
firstTxIdInLogs, curTxIdOnOtherNode, null, true, false);
|
firstTxIdInLogs, curTxIdOnOtherNode, null, true);
|
||||||
for (EditLogInputStream stream : streams) {
|
for (EditLogInputStream stream : streams) {
|
||||||
IOUtils.closeStream(stream);
|
IOUtils.closeStream(stream);
|
||||||
}
|
}
|
||||||
|
@ -165,7 +165,7 @@ FSEditLog getEditLog() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void setEditLog(FSEditLog editLog) {
|
public void setEditLog(FSEditLog editLog) {
|
||||||
this.editLog = editLog;
|
this.editLog = editLog;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,7 +178,7 @@ message GetEditLogManifestRequestProto {
|
|||||||
required JournalIdProto jid = 1;
|
required JournalIdProto jid = 1;
|
||||||
required uint64 sinceTxId = 2; // Transaction ID
|
required uint64 sinceTxId = 2; // Transaction ID
|
||||||
// Whether or not the client will be reading from the returned streams.
|
// Whether or not the client will be reading from the returned streams.
|
||||||
optional bool forReading = 3 [default = true];
|
// optional bool forReading = 3 [default = true]; <obsolete, do not reuse>
|
||||||
optional bool inProgressOk = 4 [default = false];
|
optional bool inProgressOk = 4 [default = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1487,8 +1487,9 @@ public synchronized void shutdownNameNode(int nnIndex) {
|
|||||||
*/
|
*/
|
||||||
public synchronized void restartNameNodes() throws IOException {
|
public synchronized void restartNameNodes() throws IOException {
|
||||||
for (int i = 0; i < nameNodes.length; i++) {
|
for (int i = 0; i < nameNodes.length; i++) {
|
||||||
restartNameNode(i);
|
restartNameNode(i, false);
|
||||||
}
|
}
|
||||||
|
waitActive();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,134 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.qjournal;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
|
||||||
|
|
||||||
|
public class MiniQJMHACluster {
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private MiniJournalCluster journalCluster;
|
||||||
|
private final Configuration conf;
|
||||||
|
|
||||||
|
private static String NAMESERVICE = "ns1";
|
||||||
|
private static final String NN1 = "nn1";
|
||||||
|
private static final String NN2 = "nn2";
|
||||||
|
private static final int NN1_IPC_PORT = 10000;
|
||||||
|
private static final int NN1_INFO_PORT = 10001;
|
||||||
|
private static final int NN2_IPC_PORT = 10002;
|
||||||
|
private static final int NN2_INFO_PORT = 10003;
|
||||||
|
|
||||||
|
public static class Builder {
|
||||||
|
private final Configuration conf;
|
||||||
|
private final MiniDFSCluster.Builder dfsBuilder;
|
||||||
|
|
||||||
|
public Builder(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.dfsBuilder = new MiniDFSCluster.Builder(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MiniDFSCluster.Builder getDfsBuilder() {
|
||||||
|
return dfsBuilder;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MiniQJMHACluster build() throws IOException {
|
||||||
|
return new MiniQJMHACluster(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static MiniDFSNNTopology createDefaultTopology() {
|
||||||
|
return new MiniDFSNNTopology()
|
||||||
|
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
|
||||||
|
new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
|
||||||
|
.setHttpPort(NN1_INFO_PORT)).addNN(
|
||||||
|
new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
|
||||||
|
.setHttpPort(NN2_INFO_PORT)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private MiniQJMHACluster(Builder builder) throws IOException {
|
||||||
|
this.conf = builder.conf;
|
||||||
|
// start 3 journal nodes
|
||||||
|
journalCluster = new MiniJournalCluster.Builder(conf).format(true)
|
||||||
|
.build();
|
||||||
|
URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
|
||||||
|
|
||||||
|
// start cluster with 2 NameNodes
|
||||||
|
MiniDFSNNTopology topology = createDefaultTopology();
|
||||||
|
|
||||||
|
initHAConf(journalURI, builder.conf);
|
||||||
|
|
||||||
|
// First start up the NNs just to format the namespace. The MinIDFSCluster
|
||||||
|
// has no way to just format the NameNodes without also starting them.
|
||||||
|
cluster = builder.dfsBuilder.nnTopology(topology)
|
||||||
|
.manageNameDfsSharedDirs(false).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.shutdown();
|
||||||
|
|
||||||
|
// initialize the journal nodes
|
||||||
|
Configuration confNN0 = cluster.getConfiguration(0);
|
||||||
|
NameNode.initializeSharedEdits(confNN0, true);
|
||||||
|
|
||||||
|
// restart the cluster
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Configuration initHAConf(URI journalURI, Configuration conf) {
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||||
|
journalURI.toString());
|
||||||
|
|
||||||
|
String address1 = "127.0.0.1:" + NN1_IPC_PORT;
|
||||||
|
String address2 = "127.0.0.1:" + NN2_IPC_PORT;
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
|
NAMESERVICE, NN1), address1);
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
|
NAMESERVICE, NN2), address2);
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
|
||||||
|
NN1 + "," + NN2);
|
||||||
|
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
|
||||||
|
ConfiguredFailoverProxyProvider.class.getName());
|
||||||
|
conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
|
||||||
|
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MiniDFSCluster getDfsCluster() {
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MiniJournalCluster getJournalCluster() {
|
||||||
|
return journalCluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() throws IOException {
|
||||||
|
cluster.shutdown();
|
||||||
|
journalCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
@ -18,7 +18,6 @@
|
|||||||
package org.apache.hadoop.hdfs.qjournal;
|
package org.apache.hadoop.hdfs.qjournal;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assume.*;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -916,7 +916,7 @@ public void testSelectInputStreamsNotOnBoundary() throws Exception {
|
|||||||
NNStorage.getFinalizedEditsFileName(41, 50));
|
NNStorage.getFinalizedEditsFileName(41, 50));
|
||||||
|
|
||||||
ArrayList<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
ArrayList<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||||
qjm.selectInputStreams(streams, 25, false, false);
|
qjm.selectInputStreams(streams, 25, false);
|
||||||
|
|
||||||
verifyEdits(streams, 25, 50);
|
verifyEdits(streams, 25, 50);
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
@ -180,6 +181,16 @@ public static FSImage spyOnFsImage(NameNode nn1) {
|
|||||||
return spy;
|
return spy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static FSEditLog spyOnEditLog(NameNode nn) {
|
||||||
|
FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
|
||||||
|
nn.getFSImage().setEditLogForTesting(spyEditLog);
|
||||||
|
EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
|
||||||
|
if (tailer != null) {
|
||||||
|
tailer.setEditLog(spyEditLog);
|
||||||
|
}
|
||||||
|
return spyEditLog;
|
||||||
|
}
|
||||||
|
|
||||||
public static JournalSet spyOnJournalSet(NameNode nn) {
|
public static JournalSet spyOnJournalSet(NameNode nn) {
|
||||||
FSEditLog editLog = nn.getFSImage().getEditLog();
|
FSEditLog editLog = nn.getFSImage().getEditLog();
|
||||||
JournalSet js = Mockito.spy(editLog.getJournalSet());
|
JournalSet js = Mockito.spy(editLog.getJournalSet());
|
||||||
|
@ -82,7 +82,7 @@ static long getNumberOfTransactions(FileJournalManager jm, long fromTxId,
|
|||||||
final PriorityQueue<EditLogInputStream> allStreams =
|
final PriorityQueue<EditLogInputStream> allStreams =
|
||||||
new PriorityQueue<EditLogInputStream>(64,
|
new PriorityQueue<EditLogInputStream>(64,
|
||||||
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||||
jm.selectInputStreams(allStreams, fromTxId, inProgressOk, true);
|
jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
|
||||||
EditLogInputStream elis = null;
|
EditLogInputStream elis = null;
|
||||||
try {
|
try {
|
||||||
while ((elis = allStreams.poll()) != null) {
|
while ((elis = allStreams.poll()) != null) {
|
||||||
@ -378,14 +378,8 @@ public void testGetRemoteEditLog() throws IOException {
|
|||||||
FileJournalManager fjm = new FileJournalManager(conf, sd, null);
|
FileJournalManager fjm = new FileJournalManager(conf, sd, null);
|
||||||
assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1));
|
assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1));
|
||||||
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101));
|
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101));
|
||||||
|
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 150));
|
||||||
assertEquals("[1001,1100]", getLogsAsString(fjm, 201));
|
assertEquals("[1001,1100]", getLogsAsString(fjm, 201));
|
||||||
try {
|
|
||||||
assertEquals("[]", getLogsAsString(fjm, 150));
|
|
||||||
fail("Did not throw when asking for a txn in the middle of a log");
|
|
||||||
} catch (IllegalStateException ioe) {
|
|
||||||
GenericTestUtils.assertExceptionContains(
|
|
||||||
"150 which is in the middle", ioe);
|
|
||||||
}
|
|
||||||
assertEquals("Asking for a newer log than exists should return empty list",
|
assertEquals("Asking for a newer log than exists should return empty list",
|
||||||
"", getLogsAsString(fjm, 9999));
|
"", getLogsAsString(fjm, 9999));
|
||||||
}
|
}
|
||||||
@ -404,7 +398,7 @@ private static EditLogInputStream getJournalInputStream(JournalManager jm,
|
|||||||
final PriorityQueue<EditLogInputStream> allStreams =
|
final PriorityQueue<EditLogInputStream> allStreams =
|
||||||
new PriorityQueue<EditLogInputStream>(64,
|
new PriorityQueue<EditLogInputStream>(64,
|
||||||
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||||
jm.selectInputStreams(allStreams, txId, inProgressOk, true);
|
jm.selectInputStreams(allStreams, txId, inProgressOk);
|
||||||
EditLogInputStream elis = null, ret;
|
EditLogInputStream elis = null, ret;
|
||||||
try {
|
try {
|
||||||
while ((elis = allStreams.poll()) != null) {
|
while ((elis = allStreams.poll()) != null) {
|
||||||
@ -482,6 +476,6 @@ public void testExcludeInProgressStreams() throws CorruptionException,
|
|||||||
|
|
||||||
private static String getLogsAsString(
|
private static String getLogsAsString(
|
||||||
FileJournalManager fjm, long firstTxId) throws IOException {
|
FileJournalManager fjm, long firstTxId) throws IOException {
|
||||||
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, true, false));
|
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -170,7 +170,7 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxnId, boolean inProgressOk, boolean forReading) {
|
long fromTxnId, boolean inProgressOk) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -324,7 +324,7 @@ NNStorage mockStorage() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public FSEditLog mockEditLog(StoragePurger purger) {
|
public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
|
||||||
final List<JournalManager> jms = Lists.newArrayList();
|
final List<JournalManager> jms = Lists.newArrayList();
|
||||||
final JournalSet journalSet = new JournalSet(0);
|
final JournalSet journalSet = new JournalSet(0);
|
||||||
for (FakeRoot root : dirRoots.values()) {
|
for (FakeRoot root : dirRoots.values()) {
|
||||||
@ -360,12 +360,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
Object[] args = invocation.getArguments();
|
Object[] args = invocation.getArguments();
|
||||||
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
||||||
(long)((Long)args[1]), (boolean)((Boolean)args[2]),
|
(long)((Long)args[1]), (boolean)((Boolean)args[2]));
|
||||||
(boolean)((Boolean)args[3]));
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
|
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
|
||||||
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
|
Mockito.anyLong(), Mockito.anyBoolean());
|
||||||
return mockLog;
|
return mockLog;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,24 +17,18 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -45,50 +39,22 @@
|
|||||||
* Test BootstrapStandby when QJM is used for shared edits.
|
* Test BootstrapStandby when QJM is used for shared edits.
|
||||||
*/
|
*/
|
||||||
public class TestBootstrapStandbyWithQJM {
|
public class TestBootstrapStandbyWithQJM {
|
||||||
|
private MiniQJMHACluster miniQjmHaCluster;
|
||||||
private static final String NAMESERVICE = "ns1";
|
|
||||||
private static final String NN1 = "nn1";
|
|
||||||
private static final String NN2 = "nn2";
|
|
||||||
private static final int NUM_JN = 3;
|
|
||||||
private static final int NN1_IPC_PORT = 10000;
|
|
||||||
private static final int NN1_INFO_PORT = 10001;
|
|
||||||
private static final int NN2_IPC_PORT = 10002;
|
|
||||||
private static final int NN2_INFO_PORT = 10003;
|
|
||||||
|
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private MiniJournalCluster jCluster;
|
private MiniJournalCluster jCluster;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
// start 3 journal nodes
|
Configuration conf = new Configuration();
|
||||||
jCluster = new MiniJournalCluster.Builder(new Configuration()).format(true)
|
// Turn off IPC client caching, so that the suite can handle
|
||||||
.numJournalNodes(NUM_JN).build();
|
// the restart of the daemons between test cases.
|
||||||
URI journalURI = jCluster.getQuorumJournalURI(NAMESERVICE);
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
||||||
|
0);
|
||||||
|
|
||||||
// start cluster with 2 NameNodes
|
miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build();
|
||||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
cluster = miniQjmHaCluster.getDfsCluster();
|
||||||
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
|
jCluster = miniQjmHaCluster.getJournalCluster();
|
||||||
new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
|
|
||||||
.setHttpPort(NN1_INFO_PORT)).addNN(
|
|
||||||
new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
|
|
||||||
.setHttpPort(NN2_INFO_PORT)));
|
|
||||||
|
|
||||||
Configuration conf = initHAConf(journalURI);
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
|
|
||||||
.numDataNodes(1).manageNameDfsSharedDirs(false).build();
|
|
||||||
cluster.waitActive();
|
|
||||||
|
|
||||||
Configuration confNN0 = new Configuration(conf);
|
|
||||||
cluster.shutdown();
|
|
||||||
// initialize the journal nodes
|
|
||||||
confNN0.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
|
|
||||||
NameNode.initializeSharedEdits(confNN0, true);
|
|
||||||
|
|
||||||
// restart the cluster
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).format(false)
|
|
||||||
.nnTopology(topology).numDataNodes(1).manageNameDfsSharedDirs(false)
|
|
||||||
.build();
|
|
||||||
cluster.waitActive();
|
|
||||||
|
|
||||||
// make nn0 active
|
// make nn0 active
|
||||||
cluster.transitionToActive(0);
|
cluster.transitionToActive(0);
|
||||||
@ -109,27 +75,6 @@ public void cleanup() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration initHAConf(URI journalURI) {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
|
||||||
journalURI.toString());
|
|
||||||
|
|
||||||
String address1 = "127.0.0.1:" + NN1_IPC_PORT;
|
|
||||||
String address2 = "127.0.0.1:" + NN2_IPC_PORT;
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
|
||||||
NAMESERVICE, NN1), address1);
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
|
||||||
NAMESERVICE, NN2), address2);
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
|
|
||||||
NN1 + "," + NN2);
|
|
||||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
|
|
||||||
ConfiguredFailoverProxyProvider.class.getName());
|
|
||||||
conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
|
|
||||||
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** BootstrapStandby when the existing NN is standby */
|
/** BootstrapStandby when the existing NN is standby */
|
||||||
@Test
|
@Test
|
||||||
public void testBootstrapStandbyWithStandbyNN() throws Exception {
|
public void testBootstrapStandbyWithStandbyNN() throws Exception {
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
|
||||||
@ -37,6 +38,8 @@
|
|||||||
import org.apache.hadoop.hdfs.HAUtil;
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||||
@ -48,23 +51,49 @@
|
|||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestFailureToReadEdits {
|
public class TestFailureToReadEdits {
|
||||||
|
|
||||||
private static final String TEST_DIR1 = "/test1";
|
private static final String TEST_DIR1 = "/test1";
|
||||||
private static final String TEST_DIR2 = "/test2";
|
private static final String TEST_DIR2 = "/test2";
|
||||||
private static final String TEST_DIR3 = "/test3";
|
private static final String TEST_DIR3 = "/test3";
|
||||||
|
|
||||||
|
private final TestType clusterType;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
|
private MiniQJMHACluster miniQjmHaCluster; // for QJM case only
|
||||||
private NameNode nn0;
|
private NameNode nn0;
|
||||||
private NameNode nn1;
|
private NameNode nn1;
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
|
|
||||||
|
private static enum TestType {
|
||||||
|
SHARED_DIR_HA,
|
||||||
|
QJM_HA;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run this suite of tests both for QJM-based HA and for file-based
|
||||||
|
* HA.
|
||||||
|
*/
|
||||||
|
@Parameters
|
||||||
|
public static Iterable<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{ TestType.SHARED_DIR_HA },
|
||||||
|
{ TestType.QJM_HA } });
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestFailureToReadEdits(TestType clusterType) {
|
||||||
|
this.clusterType = clusterType;
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUpCluster() throws Exception {
|
public void setUpCluster() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
@ -74,16 +103,19 @@ public void setUpCluster() throws Exception {
|
|||||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
HAUtil.setAllowStandbyReads(conf, true);
|
HAUtil.setAllowStandbyReads(conf, true);
|
||||||
|
|
||||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
if (clusterType == TestType.SHARED_DIR_HA) {
|
||||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
MiniDFSNNTopology topology = MiniQJMHACluster.createDefaultTopology();
|
||||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10041))
|
|
||||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10042)));
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf)
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
.nnTopology(topology)
|
.nnTopology(topology)
|
||||||
.numDataNodes(0)
|
.numDataNodes(0)
|
||||||
.checkExitOnShutdown(false)
|
.checkExitOnShutdown(false)
|
||||||
.build();
|
.build();
|
||||||
|
} else {
|
||||||
|
Builder builder = new MiniQJMHACluster.Builder(conf);
|
||||||
|
builder.getDfsBuilder().numDataNodes(0).checkExitOnShutdown(false);
|
||||||
|
miniQjmHaCluster = builder.build();
|
||||||
|
cluster = miniQjmHaCluster.getDfsCluster();
|
||||||
|
}
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
nn0 = cluster.getNameNode(0);
|
nn0 = cluster.getNameNode(0);
|
||||||
@ -99,9 +131,15 @@ public void tearDownCluster() throws Exception {
|
|||||||
fs.close();
|
fs.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (clusterType == TestType.SHARED_DIR_HA) {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if (miniQjmHaCluster != null) {
|
||||||
|
miniQjmHaCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -259,13 +297,10 @@ public void testFailureToReadEditsOnTransitionToActive() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
|
private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
|
||||||
FSEditLog spyEditLog = spy(nn1.getNamesystem().getEditLogTailer()
|
FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1);
|
||||||
.getEditLog());
|
|
||||||
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
||||||
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
||||||
anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean());
|
anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean());
|
||||||
nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
|
|
||||||
|
|
||||||
return answer;
|
return answer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user