HDFS-13805. Journal Nodes should allow to format non-empty directories with -force option. Contributed by Surendra Singh Lilhore.
This commit is contained in:
parent
ca29fb754e
commit
96c4575d73
@ -89,8 +89,9 @@ public ListenableFuture<Void> finalizeLogSegment(
|
||||
/**
|
||||
* Format the log directory.
|
||||
* @param nsInfo the namespace info to format with
|
||||
* @param force the force option to format
|
||||
*/
|
||||
public ListenableFuture<Void> format(NamespaceInfo nsInfo);
|
||||
public ListenableFuture<Void> format(NamespaceInfo nsInfo, boolean force);
|
||||
|
||||
/**
|
||||
* @return whether or not the remote node has any valid data.
|
||||
|
@ -299,12 +299,12 @@ public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
|
||||
return QuorumCall.create(calls);
|
||||
}
|
||||
|
||||
QuorumCall<AsyncLogger,Void> format(NamespaceInfo nsInfo) {
|
||||
QuorumCall<AsyncLogger, Void> format(NamespaceInfo nsInfo, boolean force) {
|
||||
Map<AsyncLogger, ListenableFuture<Void>> calls =
|
||||
Maps.newHashMap();
|
||||
for (AsyncLogger logger : loggers) {
|
||||
ListenableFuture<Void> future =
|
||||
logger.format(nsInfo);
|
||||
logger.format(nsInfo, force);
|
||||
calls.put(logger, future);
|
||||
}
|
||||
return QuorumCall.create(calls);
|
||||
|
@ -502,11 +502,12 @@ private synchronized void unreserveQueueSpace(int size) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
|
||||
public ListenableFuture<Void> format(final NamespaceInfo nsInfo,
|
||||
final boolean force) {
|
||||
return singleThreadExecutor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
getProxy().format(journalId, nameServiceId, nsInfo);
|
||||
getProxy().format(journalId, nameServiceId, nsInfo, force);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -220,8 +220,8 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
|
||||
}
|
||||
|
||||
@Override
|
||||
public void format(NamespaceInfo nsInfo) throws IOException {
|
||||
QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
|
||||
public void format(NamespaceInfo nsInfo, boolean force) throws IOException {
|
||||
QuorumCall<AsyncLogger, Void> call = loggers.format(nsInfo, force);
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
|
||||
"format");
|
||||
|
@ -68,7 +68,7 @@ GetJournalStateResponseProto getJournalState(String journalId,
|
||||
* Format the underlying storage for the given namespace.
|
||||
*/
|
||||
void format(String journalId, String nameServiceId,
|
||||
NamespaceInfo nsInfo) throws IOException;
|
||||
NamespaceInfo nsInfo, boolean force) throws IOException;
|
||||
|
||||
/**
|
||||
* Begin a new epoch. See the HDFS-3077 design doc for details.
|
||||
|
@ -147,7 +147,7 @@ public FormatResponseProto format(RpcController controller,
|
||||
try {
|
||||
impl.format(request.getJid().getIdentifier(),
|
||||
request.hasNameServiceId() ? request.getNameServiceId() : null,
|
||||
PBHelper.convert(request.getNsInfo()));
|
||||
PBHelper.convert(request.getNsInfo()), request.getForce());
|
||||
return FormatResponseProto.getDefaultInstance();
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
|
@ -136,11 +136,13 @@ private JournalIdProto convertJournalId(String jid) {
|
||||
@Override
|
||||
public void format(String jid,
|
||||
String nameServiceId,
|
||||
NamespaceInfo nsInfo) throws IOException {
|
||||
NamespaceInfo nsInfo,
|
||||
boolean force) throws IOException {
|
||||
try {
|
||||
FormatRequestProto.Builder req = FormatRequestProto.newBuilder()
|
||||
.setJid(convertJournalId(jid))
|
||||
.setNsInfo(PBHelper.convert(nsInfo));
|
||||
.setNsInfo(PBHelper.convert(nsInfo))
|
||||
.setForce(force);
|
||||
if(nameServiceId != null) {
|
||||
req.setNameServiceId(nameServiceId);
|
||||
}
|
||||
|
@ -204,10 +204,10 @@ private static void purgeMatching(File dir, List<Pattern> patterns,
|
||||
}
|
||||
}
|
||||
|
||||
void format(NamespaceInfo nsInfo) throws IOException {
|
||||
void format(NamespaceInfo nsInfo, boolean force) throws IOException {
|
||||
unlockAll();
|
||||
try {
|
||||
sd.analyzeStorage(StartupOption.FORMAT, this, true);
|
||||
sd.analyzeStorage(StartupOption.FORMAT, this, !force);
|
||||
} finally {
|
||||
sd.unlock();
|
||||
}
|
||||
|
@ -227,13 +227,13 @@ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException
|
||||
/**
|
||||
* Format the local storage with the given namespace.
|
||||
*/
|
||||
void format(NamespaceInfo nsInfo) throws IOException {
|
||||
void format(NamespaceInfo nsInfo, boolean force) throws IOException {
|
||||
Preconditions.checkState(nsInfo.getNamespaceID() != 0,
|
||||
"can't format with uninitialized namespace info: %s",
|
||||
nsInfo);
|
||||
LOG.info("Formatting journal id : " + journalId + " with namespace info: " +
|
||||
nsInfo);
|
||||
storage.format(nsInfo);
|
||||
nsInfo + " and force: " + force);
|
||||
storage.format(nsInfo, force);
|
||||
refreshCachedData();
|
||||
}
|
||||
|
||||
|
@ -176,9 +176,10 @@ public NewEpochResponseProto newEpoch(String journalId,
|
||||
@Override
|
||||
public void format(String journalId,
|
||||
String nameServiceId,
|
||||
NamespaceInfo nsInfo)
|
||||
NamespaceInfo nsInfo,
|
||||
boolean force)
|
||||
throws IOException {
|
||||
jn.getOrCreateJournal(journalId, nameServiceId).format(nsInfo);
|
||||
jn.getOrCreateJournal(journalId, nameServiceId).format(nsInfo, force);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -42,7 +42,7 @@ class BackupJournalManager implements JournalManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void format(NamespaceInfo nsInfo) {
|
||||
public void format(NamespaceInfo nsInfo, boolean force) {
|
||||
// format() should only get called at startup, before any BNs
|
||||
// can register with the NN.
|
||||
throw new UnsupportedOperationException(
|
||||
|
@ -417,13 +417,14 @@ synchronized void close() {
|
||||
* File-based journals are skipped, since they are formatted by the
|
||||
* Storage format code.
|
||||
*/
|
||||
synchronized void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException {
|
||||
synchronized void formatNonFileJournals(NamespaceInfo nsInfo, boolean force)
|
||||
throws IOException {
|
||||
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
|
||||
"Bad state: %s", state);
|
||||
|
||||
for (JournalManager jm : journalSet.getJournalManagers()) {
|
||||
if (!(jm instanceof FileJournalManager)) {
|
||||
jm.format(nsInfo);
|
||||
jm.format(nsInfo, force);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -160,7 +160,8 @@ protected FSImage(Configuration conf,
|
||||
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
||||
}
|
||||
|
||||
void format(FSNamesystem fsn, String clusterId) throws IOException {
|
||||
void format(FSNamesystem fsn, String clusterId, boolean force)
|
||||
throws IOException {
|
||||
long fileCount = fsn.getFilesTotal();
|
||||
// Expect 1 file, which is the root inode
|
||||
Preconditions.checkState(fileCount == 1,
|
||||
@ -171,7 +172,7 @@ void format(FSNamesystem fsn, String clusterId) throws IOException {
|
||||
ns.clusterID = clusterId;
|
||||
|
||||
storage.format(ns);
|
||||
editLog.formatNonFileJournals(ns);
|
||||
editLog.formatNonFileJournals(ns, force);
|
||||
saveFSImageInAllDirs(fsn, 0);
|
||||
}
|
||||
|
||||
|
@ -1078,8 +1078,8 @@ private void loadFSImage(StartupOption startOpt) throws IOException {
|
||||
|
||||
// format before starting up if requested
|
||||
if (startOpt == StartupOption.FORMAT) {
|
||||
|
||||
fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id
|
||||
// reuse current id
|
||||
fsImage.format(this, fsImage.getStorage().determineClusterId(), false);
|
||||
|
||||
startOpt = StartupOption.REGULAR;
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ public FileJournalManager(Configuration conf, StorageDirectory sd,
|
||||
public void close() throws IOException {}
|
||||
|
||||
@Override
|
||||
public void format(NamespaceInfo ns) throws IOException {
|
||||
public void format(NamespaceInfo ns, boolean force) throws IOException {
|
||||
// Formatting file journals is done by the StorageDirectory
|
||||
// format code, since they may share their directory with
|
||||
// checkpoints, etc.
|
||||
|
@ -43,7 +43,7 @@ public interface JournalManager extends Closeable, FormatConfirmable,
|
||||
* Format the underlying storage, removing any previously
|
||||
* stored data.
|
||||
*/
|
||||
void format(NamespaceInfo ns) throws IOException;
|
||||
void format(NamespaceInfo ns, boolean force) throws IOException;
|
||||
|
||||
/**
|
||||
* Begin writing to a new segment of the log stream, which starts at
|
||||
|
@ -188,7 +188,7 @@ public boolean isShared() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void format(NamespaceInfo nsInfo) throws IOException {
|
||||
public void format(NamespaceInfo nsInfo, boolean force) throws IOException {
|
||||
// The operation is done by FSEditLog itself
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
@ -1159,7 +1159,7 @@ private static boolean format(Configuration conf, boolean force,
|
||||
return true; // aborted
|
||||
}
|
||||
|
||||
fsImage.format(fsn, clusterId);
|
||||
fsImage.format(fsn, clusterId, force);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Encountered exception during format: ", ioe);
|
||||
fsImage.close();
|
||||
@ -1262,7 +1262,7 @@ private static boolean initializeSharedEdits(Configuration conf,
|
||||
// actually want to save a checkpoint - just prime the dirs with
|
||||
// the existing namespace info
|
||||
newSharedStorage.format(nsInfo);
|
||||
sharedEditsImage.getEditLog().formatNonFileJournals(nsInfo);
|
||||
sharedEditsImage.getEditLog().formatNonFileJournals(nsInfo, force);
|
||||
|
||||
// Need to make sure the edit log segments are in good shape to initialize
|
||||
// the shared edits dir.
|
||||
|
@ -242,6 +242,7 @@ message FormatRequestProto {
|
||||
required JournalIdProto jid = 1;
|
||||
required NamespaceInfoProto nsInfo = 2;
|
||||
optional string nameServiceId = 3;
|
||||
optional bool force = 4 [ default = false ];
|
||||
}
|
||||
|
||||
message FormatResponseProto {
|
||||
|
@ -56,7 +56,7 @@ public void testSingleThreaded() throws IOException {
|
||||
QuorumJournalManager qjm = new QuorumJournalManager(
|
||||
conf, uri, FAKE_NSINFO);
|
||||
try {
|
||||
qjm.format(FAKE_NSINFO);
|
||||
qjm.format(FAKE_NSINFO, false);
|
||||
} finally {
|
||||
qjm.close();
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ private static long determineMaxIpcNumber() throws Exception {
|
||||
long ret;
|
||||
try {
|
||||
qjm = createInjectableQJM(cluster);
|
||||
qjm.format(FAKE_NSINFO);
|
||||
qjm.format(FAKE_NSINFO, false);
|
||||
doWorkload(cluster, qjm);
|
||||
|
||||
SortedSet<Integer> ipcCounts = Sets.newTreeSet();
|
||||
@ -156,7 +156,7 @@ public void testRecoverAfterDoubleFailures() throws Exception {
|
||||
QuorumJournalManager qjm = null;
|
||||
try {
|
||||
qjm = createInjectableQJM(cluster);
|
||||
qjm.format(FAKE_NSINFO);
|
||||
qjm.format(FAKE_NSINFO, false);
|
||||
List<AsyncLogger> loggers = qjm.getLoggerSetForTests().getLoggersForTests();
|
||||
failIpcNumber(loggers.get(0), failA);
|
||||
failIpcNumber(loggers.get(1), failB);
|
||||
@ -240,7 +240,7 @@ public void testRandomized() throws Exception {
|
||||
// Format the cluster using a non-faulty QJM.
|
||||
QuorumJournalManager qjmForInitialFormat =
|
||||
createInjectableQJM(cluster);
|
||||
qjmForInitialFormat.format(FAKE_NSINFO);
|
||||
qjmForInitialFormat.format(FAKE_NSINFO, false);
|
||||
qjmForInitialFormat.close();
|
||||
|
||||
try {
|
||||
|
@ -100,7 +100,7 @@ public void setup() throws Exception {
|
||||
qjm = createSpyingQJM();
|
||||
spies = qjm.getLoggerSetForTests().getLoggersForTests();
|
||||
|
||||
qjm.format(QJMTestUtil.FAKE_NSINFO);
|
||||
qjm.format(QJMTestUtil.FAKE_NSINFO, false);
|
||||
qjm.recoverUnfinalizedSegments();
|
||||
assertEquals(1, qjm.getLoggerSetForTests().getEpoch());
|
||||
}
|
||||
@ -149,7 +149,7 @@ public void testFormat() throws Exception {
|
||||
QuorumJournalManager qjm = closeLater(new QuorumJournalManager(
|
||||
conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO));
|
||||
assertFalse(qjm.hasSomeData());
|
||||
qjm.format(FAKE_NSINFO);
|
||||
qjm.format(FAKE_NSINFO, false);
|
||||
assertTrue(qjm.hasSomeData());
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.eq;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -89,7 +90,8 @@ protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {
|
||||
NewEpochResponseProto.newBuilder().build()
|
||||
).when(logger).newEpoch(Mockito.anyLong());
|
||||
|
||||
futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any());
|
||||
futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any(),
|
||||
anyBoolean());
|
||||
}
|
||||
|
||||
qjm.recoverUnfinalizedSegments();
|
||||
|
@ -73,7 +73,7 @@ public void setup() throws Exception {
|
||||
conf = new Configuration();
|
||||
journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
|
||||
mockErrorReporter);
|
||||
journal.format(FAKE_NSINFO);
|
||||
journal.format(FAKE_NSINFO, false);
|
||||
}
|
||||
|
||||
@After
|
||||
@ -207,7 +207,7 @@ public void testFormatResetsCachedValues() throws Exception {
|
||||
// Clear the storage directory before reformatting it
|
||||
journal.getStorage().getJournalManager()
|
||||
.getStorageDirectory().clearDirectory();
|
||||
journal.format(FAKE_NSINFO_2);
|
||||
journal.format(FAKE_NSINFO_2, false);
|
||||
|
||||
assertEquals(0, journal.getLastPromisedEpoch());
|
||||
assertEquals(0, journal.getLastWriterEpoch());
|
||||
@ -425,7 +425,7 @@ public void testFormatNonEmptyStorageDirectories() throws Exception {
|
||||
try {
|
||||
// Format again here and to format the non-empty directories in
|
||||
// journal node.
|
||||
journal.format(FAKE_NSINFO);
|
||||
journal.format(FAKE_NSINFO, false);
|
||||
fail("Did not fail to format non-empty directories in journal node.");
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
@ -434,4 +434,15 @@ public void testFormatNonEmptyStorageDirectories() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFormatNonEmptyStorageDirectoriesWhenforceOptionIsTrue()
|
||||
throws Exception {
|
||||
try {
|
||||
// Format again here and to format the non-empty directories in
|
||||
// journal node.
|
||||
journal.format(FAKE_NSINFO, true);
|
||||
} catch (IOException ioe) {
|
||||
fail("Format should be success with force option.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -159,11 +159,11 @@ public void setup() throws Exception {
|
||||
HdfsServerConstants.StartupOption.REGULAR);
|
||||
NamespaceInfo fakeNameSpaceInfo = new NamespaceInfo(
|
||||
12345, "mycluster", "my-bp"+nsId, 0L);
|
||||
journal.format(fakeNameSpaceInfo);
|
||||
journal.format(fakeNameSpaceInfo, false);
|
||||
}
|
||||
} else {
|
||||
journal = jn.getOrCreateJournal(journalId);
|
||||
journal.format(FAKE_NSINFO);
|
||||
journal.format(FAKE_NSINFO, false);
|
||||
}
|
||||
|
||||
|
||||
|
@ -82,7 +82,7 @@ public void testJournalNodeMXBean() throws Exception {
|
||||
// format the journal ns1
|
||||
final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(12345, "mycluster",
|
||||
"my-bp", 0L);
|
||||
jn.getOrCreateJournal(NAMESERVICE).format(FAKE_NSINFO);
|
||||
jn.getOrCreateJournal(NAMESERVICE).format(FAKE_NSINFO, false);
|
||||
|
||||
// check again after format
|
||||
// getJournalsStatus
|
||||
|
@ -341,7 +341,7 @@ public void testSyncAfterJNformat() throws Exception{
|
||||
}
|
||||
|
||||
// Format the JN
|
||||
journal1.format(nsInfo);
|
||||
journal1.format(nsInfo, false);
|
||||
|
||||
// Roll some more edits
|
||||
for (int i = 4; i < 10; i++) {
|
||||
|
@ -155,7 +155,7 @@ public DummyJournalManager(Configuration conf, URI u,
|
||||
}
|
||||
|
||||
@Override
|
||||
public void format(NamespaceInfo nsInfo) throws IOException {
|
||||
public void format(NamespaceInfo nsInfo, boolean force) throws IOException {
|
||||
formatCalled = true;
|
||||
}
|
||||
|
||||
|
@ -249,7 +249,7 @@ void assertCounterInRange(Job job, Enum<?> counter, long min, long max)
|
||||
Counter c = job.getCounters().findCounter(counter);
|
||||
long value = c.getValue();
|
||||
String description =
|
||||
String.format("%s value %s", c.getDisplayName(), value);
|
||||
String.format("%s value %s", c.getDisplayName(), value, false);
|
||||
|
||||
if (min >= 0) {
|
||||
assertTrue(description + " too below minimum " + min,
|
||||
|
Loading…
Reference in New Issue
Block a user