Merged r1185354 from trunk for HDFS-2188.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1185363 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7ca7832158
commit
ab7e7dfd7e
@ -97,6 +97,9 @@ Trunk (unreleased changes)
|
|||||||
HDFS-2298. Fix TestDfsOverAvroRpc by changing ClientProtocol to
|
HDFS-2298. Fix TestDfsOverAvroRpc by changing ClientProtocol to
|
||||||
not include multiple methods of the same name. (cutting)
|
not include multiple methods of the same name. (cutting)
|
||||||
|
|
||||||
|
HDFS-2188. Make FSEditLog create its journals from a list of URIs rather
|
||||||
|
than NNStorage. (Ivan Kelly via jitendra)
|
||||||
|
|
||||||
Release 0.23.0 - Unreleased
|
Release 0.23.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -18,10 +18,11 @@
|
|||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||||
|
import java.net.URI;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -42,9 +43,11 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* FSEditLog maintains a log of the namespace modifications.
|
* FSEditLog maintains a log of the namespace modifications.
|
||||||
@ -122,16 +125,53 @@ protected synchronized TransactionId initialValue() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
final private Collection<URI> editsDirs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct FSEditLog with default configuration, taking editDirs from NNStorage
|
||||||
|
* @param storage Storage object used by namenode
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
FSEditLog(NNStorage storage) {
|
FSEditLog(NNStorage storage) {
|
||||||
|
this(new Configuration(), storage, Collections.<URI>emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor for FSEditLog. Add underlying journals are constructed, but
|
||||||
|
* no streams are opened until open() is called.
|
||||||
|
*
|
||||||
|
* @param conf The namenode configuration
|
||||||
|
* @param storage Storage object used by namenode
|
||||||
|
* @param editsDirs List of journals to use
|
||||||
|
*/
|
||||||
|
FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
|
||||||
isSyncRunning = false;
|
isSyncRunning = false;
|
||||||
this.storage = storage;
|
this.storage = storage;
|
||||||
metrics = NameNode.getNameNodeMetrics();
|
metrics = NameNode.getNameNodeMetrics();
|
||||||
lastPrintTime = now();
|
lastPrintTime = now();
|
||||||
|
|
||||||
|
if (editsDirs.isEmpty()) {
|
||||||
|
// if this is the case, no edit dirs have been explictly configured
|
||||||
|
// image dirs are to be used for edits too
|
||||||
|
try {
|
||||||
|
editsDirs = Lists.newArrayList(storage.getEditsDirectories());
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// cannot get list from storage, so the empty editsDirs
|
||||||
|
// will be assigned. an error will be thrown on first use
|
||||||
|
// of the editlog, as no journals will exist
|
||||||
|
}
|
||||||
|
this.editsDirs = editsDirs;
|
||||||
|
} else {
|
||||||
|
this.editsDirs = Lists.newArrayList(editsDirs);
|
||||||
|
}
|
||||||
|
|
||||||
this.journalSet = new JournalSet();
|
this.journalSet = new JournalSet();
|
||||||
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
|
for (URI u : this.editsDirs) {
|
||||||
|
StorageDirectory sd = storage.getStorageDirectory(u);
|
||||||
|
if (sd != null) {
|
||||||
journalSet.add(new FileJournalManager(sd));
|
journalSet.add(new FileJournalManager(sd));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (journalSet.isEmpty()) {
|
if (journalSet.isEmpty()) {
|
||||||
LOG.error("No edits directories configured!");
|
LOG.error("No edits directories configured!");
|
||||||
@ -139,6 +179,14 @@ protected synchronized TransactionId initialValue() {
|
|||||||
state = State.BETWEEN_LOG_SEGMENTS;
|
state = State.BETWEEN_LOG_SEGMENTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of URIs the editlog is using for storage
|
||||||
|
* @return collection of URIs in use by the edit log
|
||||||
|
*/
|
||||||
|
Collection<URI> getEditURIs() {
|
||||||
|
return editsDirs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the output stream for logging, opening the first
|
* Initialize the output stream for logging, opening the first
|
||||||
* log segment.
|
* log segment.
|
||||||
|
@ -120,7 +120,7 @@ protected FSImage(Configuration conf,
|
|||||||
storage.setRestoreFailedStorage(true);
|
storage.setRestoreFailedStorage(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.editLog = new FSEditLog(storage);
|
this.editLog = new FSEditLog(conf, storage, editsDirs);
|
||||||
|
|
||||||
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
||||||
}
|
}
|
||||||
@ -150,8 +150,7 @@ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
|
|||||||
"NameNode formatting should be performed before reading the image";
|
"NameNode formatting should be performed before reading the image";
|
||||||
|
|
||||||
Collection<URI> imageDirs = storage.getImageDirectories();
|
Collection<URI> imageDirs = storage.getImageDirectories();
|
||||||
Collection<URI> editsDirs = storage.getEditsDirectories();
|
Collection<URI> editsDirs = editLog.getEditURIs();
|
||||||
|
|
||||||
|
|
||||||
// none of the data dirs exist
|
// none of the data dirs exist
|
||||||
if((imageDirs.size() == 0 || editsDirs.size() == 0)
|
if((imageDirs.size() == 0 || editsDirs.size() == 0)
|
||||||
|
@ -59,6 +59,7 @@
|
|||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* NNStorage is responsible for management of the StorageDirectories used by
|
* NNStorage is responsible for management of the StorageDirectories used by
|
||||||
@ -154,7 +155,9 @@ public NNStorage(Configuration conf,
|
|||||||
|
|
||||||
storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
|
storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
|
||||||
|
|
||||||
setStorageDirectories(imageDirs, editsDirs);
|
// this may modify the editsDirs, so copy before passing in
|
||||||
|
setStorageDirectories(imageDirs,
|
||||||
|
Lists.newArrayList(editsDirs));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // Storage
|
@Override // Storage
|
||||||
@ -298,6 +301,27 @@ synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the storage directory corresponding to the passed URI
|
||||||
|
* @param uri URI of a storage directory
|
||||||
|
* @return The matching storage directory or null if none found
|
||||||
|
*/
|
||||||
|
StorageDirectory getStorageDirectory(URI uri) {
|
||||||
|
try {
|
||||||
|
uri = Util.fileAsURI(new File(uri));
|
||||||
|
Iterator<StorageDirectory> it = dirIterator();
|
||||||
|
for (; it.hasNext(); ) {
|
||||||
|
StorageDirectory sd = it.next();
|
||||||
|
if (Util.fileAsURI(sd.getRoot()).equals(uri)) {
|
||||||
|
return sd;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Error converting file to URI", ioe);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks the consistency of a URI, in particular if the scheme
|
* Checks the consistency of a URI, in particular if the scheme
|
||||||
* is specified and is supported by a concrete implementation
|
* is specified and is supported by a concrete implementation
|
||||||
|
@ -41,10 +41,13 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.Matchers;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
@ -105,7 +108,7 @@ public static StorageDirectory mockStorageDirectory(
|
|||||||
Mockito.doReturn(type)
|
Mockito.doReturn(type)
|
||||||
.when(sd).getStorageDirType();
|
.when(sd).getStorageDirType();
|
||||||
Mockito.doReturn(currentDir).when(sd).getCurrentDir();
|
Mockito.doReturn(currentDir).when(sd).getCurrentDir();
|
||||||
|
Mockito.doReturn(currentDir).when(sd).getRoot();
|
||||||
Mockito.doReturn(mockFile(true)).when(sd).getVersionFile();
|
Mockito.doReturn(mockFile(true)).when(sd).getVersionFile();
|
||||||
Mockito.doReturn(mockFile(false)).when(sd).getPreviousDir();
|
Mockito.doReturn(mockFile(false)).when(sd).getPreviousDir();
|
||||||
return sd;
|
return sd;
|
||||||
@ -127,6 +130,7 @@ static StorageDirectory mockStorageDirectory(
|
|||||||
|
|
||||||
// Version file should always exist
|
// Version file should always exist
|
||||||
doReturn(mockFile(true)).when(sd).getVersionFile();
|
doReturn(mockFile(true)).when(sd).getVersionFile();
|
||||||
|
doReturn(mockFile(true)).when(sd).getRoot();
|
||||||
|
|
||||||
// Previous dir optionally exists
|
// Previous dir optionally exists
|
||||||
doReturn(mockFile(previousExists))
|
doReturn(mockFile(previousExists))
|
||||||
@ -142,6 +146,7 @@ static StorageDirectory mockStorageDirectory(
|
|||||||
doReturn(files).when(mockDir).listFiles();
|
doReturn(files).when(mockDir).listFiles();
|
||||||
doReturn(mockDir).when(sd).getCurrentDir();
|
doReturn(mockDir).when(sd).getCurrentDir();
|
||||||
|
|
||||||
|
|
||||||
return sd;
|
return sd;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,11 +174,16 @@ public static FSEditLog createStandaloneEditLog(File logDir)
|
|||||||
assertTrue(logDir.mkdirs() || logDir.exists());
|
assertTrue(logDir.mkdirs() || logDir.exists());
|
||||||
Files.deleteDirectoryContents(logDir);
|
Files.deleteDirectoryContents(logDir);
|
||||||
NNStorage storage = Mockito.mock(NNStorage.class);
|
NNStorage storage = Mockito.mock(NNStorage.class);
|
||||||
List<StorageDirectory> sds = Lists.newArrayList(
|
StorageDirectory sd
|
||||||
FSImageTestUtil.mockStorageDirectory(logDir, NameNodeDirType.EDITS));
|
= FSImageTestUtil.mockStorageDirectory(logDir, NameNodeDirType.EDITS);
|
||||||
|
List<StorageDirectory> sds = Lists.newArrayList(sd);
|
||||||
Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
|
Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
|
||||||
|
Mockito.doReturn(sd).when(storage)
|
||||||
|
.getStorageDirectory(Matchers.<URI>anyObject());
|
||||||
|
|
||||||
return new FSEditLog(storage);
|
return new FSEditLog(new Configuration(),
|
||||||
|
storage,
|
||||||
|
ImmutableList.of(logDir.toURI()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -46,6 +46,7 @@
|
|||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
@ -861,8 +862,11 @@ public void testEditLogManifestMocks() throws IOException {
|
|||||||
* The syntax <code>[1,]</code> specifies an in-progress log starting at
|
* The syntax <code>[1,]</code> specifies an in-progress log starting at
|
||||||
* txid 1.
|
* txid 1.
|
||||||
*/
|
*/
|
||||||
private NNStorage mockStorageWithEdits(String... editsDirSpecs) {
|
private NNStorage mockStorageWithEdits(String... editsDirSpecs) throws IOException {
|
||||||
List<StorageDirectory> sds = Lists.newArrayList();
|
List<StorageDirectory> sds = Lists.newArrayList();
|
||||||
|
List<URI> uris = Lists.newArrayList();
|
||||||
|
|
||||||
|
NNStorage storage = Mockito.mock(NNStorage.class);
|
||||||
for (String dirSpec : editsDirSpecs) {
|
for (String dirSpec : editsDirSpecs) {
|
||||||
List<String> files = Lists.newArrayList();
|
List<String> files = Lists.newArrayList();
|
||||||
String[] logSpecs = dirSpec.split("\\|");
|
String[] logSpecs = dirSpec.split("\\|");
|
||||||
@ -878,13 +882,17 @@ private NNStorage mockStorageWithEdits(String... editsDirSpecs) {
|
|||||||
Long.valueOf(m.group(2))));
|
Long.valueOf(m.group(2))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sds.add(FSImageTestUtil.mockStorageDirectory(
|
StorageDirectory sd = FSImageTestUtil.mockStorageDirectory(
|
||||||
NameNodeDirType.EDITS, false,
|
NameNodeDirType.EDITS, false,
|
||||||
files.toArray(new String[0])));
|
files.toArray(new String[0]));
|
||||||
|
sds.add(sd);
|
||||||
|
URI u = URI.create("file:///storage"+ Math.random());
|
||||||
|
Mockito.doReturn(sd).when(storage).getStorageDirectory(u);
|
||||||
|
uris.add(u);
|
||||||
}
|
}
|
||||||
|
|
||||||
NNStorage storage = Mockito.mock(NNStorage.class);
|
|
||||||
Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
|
Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
|
||||||
|
Mockito.doReturn(uris).when(storage).getEditsDirectories();
|
||||||
return storage;
|
return storage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user