HDFS-3840. JournalNodes log JournalNotFormattedException backtrace error before being formatted. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1383252 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8a8c9c18d3
commit
a93ba1648a
@ -68,3 +68,5 @@ HDFS-3914. QJM: acceptRecovery should abort current segment (todd)
|
||||
HDFS-3915. QJM: Failover fails with auth error in secure cluster (todd)
|
||||
|
||||
HDFS-3906. QJM: quorum timeout on failover with large log segment (todd)
|
||||
|
||||
HDFS-3840. JournalNodes log JournalNotFormattedException backtrace error before being formatted (todd)
|
||||
|
@ -90,6 +90,11 @@ public ListenableFuture<Void> finalizeLogSegment(
|
||||
*/
|
||||
public ListenableFuture<Void> format(NamespaceInfo nsInfo);
|
||||
|
||||
/**
|
||||
* @return whether or not the remote node has any valid data.
|
||||
*/
|
||||
public ListenableFuture<Boolean> isFormatted();
|
||||
|
||||
/**
|
||||
* @return the state of the last epoch on the target node.
|
||||
*/
|
||||
|
@ -216,29 +216,7 @@ public QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
|
||||
public QuorumCall<AsyncLogger, Boolean> isFormatted() {
|
||||
Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
|
||||
for (AsyncLogger logger : loggers) {
|
||||
final SettableFuture<Boolean> ret = SettableFuture.create();
|
||||
ListenableFuture<GetJournalStateResponseProto> jstate =
|
||||
logger.getJournalState();
|
||||
Futures.addCallback(jstate, new FutureCallback<GetJournalStateResponseProto>() {
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (t instanceof RemoteException) {
|
||||
t = ((RemoteException)t).unwrapRemoteException();
|
||||
}
|
||||
if (t instanceof JournalNotFormattedException) {
|
||||
ret.set(false);
|
||||
} else {
|
||||
ret.setException(t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(GetJournalStateResponseProto jstate) {
|
||||
ret.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
calls.put(logger, ret);
|
||||
calls.put(logger, logger.isFormatted());
|
||||
}
|
||||
return QuorumCall.create(calls);
|
||||
}
|
||||
|
@ -293,6 +293,16 @@ public void run() {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Boolean> isFormatted() {
|
||||
return executor.submit(new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws IOException {
|
||||
return getProxy().isFormatted(journalId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
|
||||
return executor.submit(new Callable<GetJournalStateResponseProto>() {
|
||||
|
@ -47,6 +47,12 @@
|
||||
public interface QJournalProtocol {
|
||||
public static final long versionID = 1L;
|
||||
|
||||
/**
|
||||
* @return true if the given journal has been formatted and
|
||||
* contains valid data.
|
||||
*/
|
||||
public boolean isFormatted(String journalId) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the current state of the journal, including the most recent
|
||||
* epoch number and the HTTP port.
|
||||
|
@ -32,6 +32,8 @@
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
|
||||
@ -67,6 +69,22 @@ public QJournalProtocolServerSideTranslatorPB(QJournalProtocol impl) {
|
||||
this.impl = impl;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public IsFormattedResponseProto isFormatted(RpcController controller,
|
||||
IsFormattedRequestProto request) throws ServiceException {
|
||||
try {
|
||||
boolean ret = impl.isFormatted(
|
||||
convert(request.getJid()));
|
||||
return IsFormattedResponseProto.newBuilder()
|
||||
.setIsFormatted(ret)
|
||||
.build();
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public GetJournalStateResponseProto getJournalState(RpcController controller,
|
||||
GetJournalStateRequestProto request) throws ServiceException {
|
||||
|
@ -33,6 +33,8 @@
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
|
||||
@ -78,6 +80,20 @@ public void close() {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isFormatted(String journalId) throws IOException {
|
||||
try {
|
||||
IsFormattedRequestProto req = IsFormattedRequestProto.newBuilder()
|
||||
.setJid(convertJournalId(journalId))
|
||||
.build();
|
||||
IsFormattedResponseProto resp = rpcProxy.isFormatted(
|
||||
NULL_CONTROLLER, req);
|
||||
return resp.getIsFormatted();
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetJournalStateResponseProto getJournalState(String jid)
|
||||
throws IOException {
|
||||
|
@ -402,10 +402,14 @@ private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOExcept
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized boolean isFormatted() {
|
||||
return storage.isFormatted();
|
||||
}
|
||||
|
||||
private void checkFormatted() throws JournalNotFormattedException {
|
||||
if (!storage.isFormatted()) {
|
||||
throw new JournalNotFormattedException("Journal " + storage +
|
||||
" not formatted");
|
||||
if (!isFormatted()) {
|
||||
throw new JournalNotFormattedException("Journal " +
|
||||
storage.getSingularStorageDir() + " not formatted");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,6 +107,11 @@ static InetSocketAddress getAddress(Configuration conf) {
|
||||
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFormatted(String journalId) throws IOException {
|
||||
return jn.getOrCreateJournal(journalId).isFormatted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetJournalStateResponseProto getJournalState(String journalId)
|
||||
throws IOException {
|
||||
|
@ -116,6 +116,17 @@ message PurgeLogsRequestProto {
|
||||
message PurgeLogsResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* isFormatted()
|
||||
*/
|
||||
message IsFormattedRequestProto {
|
||||
required JournalIdProto jid = 1;
|
||||
}
|
||||
|
||||
message IsFormattedResponseProto {
|
||||
required bool isFormatted = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* getJournalState()
|
||||
*/
|
||||
@ -210,6 +221,8 @@ message AcceptRecoveryResponseProto {
|
||||
* See the request and response for details of rpc call.
|
||||
*/
|
||||
service QJournalProtocolService {
|
||||
rpc isFormatted(IsFormattedRequestProto) returns (IsFormattedResponseProto);
|
||||
|
||||
rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
|
||||
|
||||
rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
|
||||
|
@ -156,10 +156,13 @@ public void testFormatResetsCachedValues() throws Exception {
|
||||
|
||||
assertEquals(12345L, journal.getLastPromisedEpoch());
|
||||
assertEquals(12345L, journal.getLastWriterEpoch());
|
||||
|
||||
assertTrue(journal.isFormatted());
|
||||
|
||||
journal.format(FAKE_NSINFO_2);
|
||||
|
||||
assertEquals(0, journal.getLastPromisedEpoch());
|
||||
assertEquals(0, journal.getLastWriterEpoch());
|
||||
assertTrue(journal.isFormatted());
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user