HDFS-2941. Add an administrative command to download a copy of the fsimage from the NN. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1305447 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-03-26 17:02:40 +00:00
parent d5836856a2
commit ce1a7ec975
8 changed files with 269 additions and 27 deletions

View File

@ -11,6 +11,9 @@ Trunk (unreleased changes)
HDFS-234. Integration with BookKeeper logging system. (Ivan Kelly
via jitendra)
HDFS-2941. Add an administrative command to download a copy of the fsimage
from the NN. (atm)
IMPROVEMENTS
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->

View File

@ -30,11 +30,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -258,4 +261,27 @@ public static void cloneDelegationTokenForLogicalUri(
LOG.debug("Mapped HA service delegation token for logical URI " +
haUri + " to namenode " + singleNNAddr);
}
/**
* Get the internet address of the currently-active NN. This should rarely be
* used, since callers of this method who connect directly to the NN using the
* resulting InetSocketAddress will not be able to connect to the active NN if
* a failover were to occur after this method has been called.
*
* @param fs the file system to get the active address of.
* @return the internet address of the currently-active NN.
* @throws IOException if an error occurs while resolving the active NN.
*/
@SuppressWarnings("deprecation")
public static InetSocketAddress getAddressOfActive(FileSystem fs)
throws IOException {
if (!(fs instanceof DistributedFileSystem)) {
throw new IllegalArgumentException("FileSystem " + fs + " is not a DFS.");
}
// force client address resolution.
fs.exists(new Path("/"));
DistributedFileSystem dfs = (DistributedFileSystem) fs;
DFSClient dfsClient = dfs.getClient();
return RPC.getServerAddress(dfsClient.getNamenode());
}
}

View File

@ -57,10 +57,14 @@ public class GetImageServlet extends HttpServlet {
private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
public final static String CONTENT_DISPOSITION = "Content-Disposition";
public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
private static final String TXID_PARAM = "txid";
private static final String START_TXID_PARAM = "startTxId";
private static final String END_TXID_PARAM = "endTxId";
private static final String STORAGEINFO_PARAM = "storageInfo";
private static final String LATEST_FSIMAGE_VALUE = "latest";
private static Set<Long> currentlyDownloadingCheckpoints =
Collections.<Long>synchronizedSet(new HashSet<Long>());
@ -101,10 +105,18 @@ public void doGet(final HttpServletRequest request,
public Void run() throws Exception {
if (parsedParams.isGetImage()) {
long txid = parsedParams.getTxId();
File imageFile = nnImage.getStorage().getFsImageName(txid);
if (imageFile == null) {
throw new IOException("Could not find image with txid " + txid);
File imageFile = null;
String errorMessage = "Could not find image";
if (parsedParams.shouldFetchLatest()) {
imageFile = nnImage.getStorage().getHighestFsImageName();
} else {
errorMessage += " with txid " + txid;
imageFile = nnImage.getStorage().getFsImageName(txid);
}
if (imageFile == null) {
throw new IOException(errorMessage);
}
setFileNameHeaders(response, imageFile);
setVerificationHeaders(response, imageFile);
// send fsImage
TransferFsImage.getFileServer(response.getOutputStream(), imageFile,
@ -117,6 +129,7 @@ public Void run() throws Exception {
.findFinalizedEditsFile(startTxId, endTxId);
setVerificationHeaders(response, editFile);
setFileNameHeaders(response, editFile);
// send edits
TransferFsImage.getFileServer(response.getOutputStream(), editFile,
getThrottler(conf));
@ -182,6 +195,13 @@ private UserGroupInformation reloginIfNecessary() throws IOException {
}
}
private static void setFileNameHeaders(HttpServletResponse response,
File file) {
response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
file.getName());
response.setHeader(HADOOP_IMAGE_EDITS_HEADER, file.getName());
}
/**
* Construct a throttler from conf
* @param conf configuration
@ -198,7 +218,6 @@ private final DataTransferThrottler getThrottler(Configuration conf) {
return throttler;
}
@SuppressWarnings("deprecation")
protected boolean isValidRequestor(String remoteUser, Configuration conf)
throws IOException {
if(remoteUser == null) { // This really shouldn't happen...
@ -243,13 +262,16 @@ private void setVerificationHeaders(HttpServletResponse response, File file)
response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
}
}
static String getParamStringForMostRecentImage() {
return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE;
}
static String getParamStringForImage(long txid,
StorageInfo remoteStorageInfo) {
return "getimage=1&" + TXID_PARAM + "=" + txid
+ "&" + STORAGEINFO_PARAM + "=" +
remoteStorageInfo.toColonSeparatedString();
}
static String getParamStringForLog(RemoteEditLog log,
@ -280,6 +302,7 @@ static class GetImageParams {
private String machineName;
private long startTxId, endTxId, txId;
private String storageInfoString;
private boolean fetchLatest;
/**
* @param request the object from which this servlet reads the url contents
@ -291,7 +314,7 @@ public GetImageParams(HttpServletRequest request,
) throws IOException {
@SuppressWarnings("unchecked")
Map<String, String[]> pmap = request.getParameterMap();
isGetImage = isGetEdit = isPutImage = false;
isGetImage = isGetEdit = isPutImage = fetchLatest = false;
remoteport = 0;
machineName = null;
@ -300,7 +323,15 @@ public GetImageParams(HttpServletRequest request,
String[] val = entry.getValue();
if (key.equals("getimage")) {
isGetImage = true;
txId = parseLongParam(request, TXID_PARAM);
try {
txId = parseLongParam(request, TXID_PARAM);
} catch (NumberFormatException nfe) {
if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
fetchLatest = true;
} else {
throw nfe;
}
}
} else if (key.equals("getedit")) {
isGetEdit = true;
startTxId = parseLongParam(request, START_TXID_PARAM);
@ -361,6 +392,10 @@ String getInfoServer() throws IOException{
return machineName + ":" + remoteport;
}
boolean shouldFetchLatest() {
return fetchLatest;
}
private static long parseLongParam(HttpServletRequest request, String param)
throws IOException {
// Parse the 'txid' parameter which indicates which image is to be

View File

@ -536,6 +536,10 @@ public File getFsImageName(long txid) {
}
return null;
}
public File getHighestFsImageName() {
return getFsImageName(getMostRecentCheckpointTxId());
}
/** Create new dfs name directory. Caution: this destroys all files
* in this filesystem. */

View File

@ -21,6 +21,7 @@
import java.net.*;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.lang.Math;
@ -51,6 +52,13 @@ public class TransferFsImage {
public final static String MD5_HEADER = "X-MD5-Digest";
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
public static void downloadMostRecentImageToDirectory(String fsName,
File dir) throws IOException {
String fileId = GetImageServlet.getParamStringForMostRecentImage();
getFileClient(fsName, fileId, Lists.newArrayList(dir),
null, false);
}
public static MD5Hash downloadImageToStorage(
String fsName, long imageTxId, NNStorage dstStorage, boolean needDigest)
@ -227,6 +235,25 @@ static MD5Hash getFileClient(String nnHostPort,
"by the namenode when trying to fetch " + str);
}
if (localPaths != null) {
String fsImageName = connection.getHeaderField(
GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
// If the local paths refer to directories, use the server-provided header
// as the filename within that directory
List<File> newLocalPaths = new ArrayList<File>();
for (File localPath : localPaths) {
if (localPath.isDirectory()) {
if (fsImageName == null) {
throw new IOException("No filename header provided by server");
}
newLocalPaths.add(new File(localPath, fsImageName));
} else {
newLocalPaths.add(localPath);
}
}
localPaths = newLocalPaths;
}
MD5Hash advertisedDigest = parseMD5Header(connection);
long received = 0;
@ -251,7 +278,11 @@ static MD5Hash getFileClient(String nnHostPort,
outputStreams.add(new FileOutputStream(f));
} catch (IOException ioe) {
LOG.warn("Unable to download file " + f, ioe);
dstStorage.reportErrorOnFile(f);
// This will be null if we're downloading the fsimage to a file
// outside of an NNStorage directory.
if (dstStorage != null) {
dstStorage.reportErrorOnFile(f);
}
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.tools;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -37,6 +38,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -46,6 +48,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
@ -488,6 +491,25 @@ public int setBalancerBandwidth(String[] argv, int idx) throws IOException {
return exitCode;
}
/**
* Download the most recent fsimage from the name node, and save it to a local
* file in the given directory.
*
* @param argv
* List of of command line parameters.
* @param idx
* The index of the command that is being processed.
* @return an exit code indicating success or failure.
* @throws IOException
*/
public int fetchImage(String[] argv, int idx) throws IOException {
String infoServer = DFSUtil.getInfoServer(
HAUtil.getAddressOfActive(getDFS()), getConf(), true);
TransferFsImage.downloadMostRecentImageToDirectory(infoServer,
new File(argv[idx]));
return 0;
}
private void printHelp(String cmd) {
String summary = "hadoop dfsadmin is the command to execute DFS administrative commands.\n" +
"The full syntax is: \n\n" +
@ -506,6 +528,7 @@ private void printHelp(String cmd) {
"\t[-refreshNamenodes datanodehost:port]\n"+
"\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
"\t[-setBalancerBandwidth <bandwidth>]\n" +
"\t[-fetchImage <local directory>]\n" +
"\t[-help [cmd]]\n";
String report ="-report: \tReports basic filesystem information and statistics.\n";
@ -592,6 +615,10 @@ private void printHelp(String cmd) {
"\t\tthe dfs.balance.bandwidthPerSec parameter.\n\n" +
"\t\t--- NOTE: The new value is not persistent on the DataNode.---\n";
String fetchImage = "-fetchImage <local directory>:\n" +
"\tDownloads the most recent fsimage from the Name Node and saves it in" +
"\tthe specified local directory.\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
@ -633,6 +660,8 @@ private void printHelp(String cmd) {
System.out.println(deleteBlockPool);
} else if ("setBalancerBandwidth".equals(cmd)) {
System.out.println(setBalancerBandwidth);
} else if ("fetchImage".equals(cmd)) {
System.out.println(fetchImage);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
@ -655,6 +684,8 @@ private void printHelp(String cmd) {
System.out.println(printTopology);
System.out.println(refreshNamenodes);
System.out.println(deleteBlockPool);
System.out.println(setBalancerBandwidth);
System.out.println(fetchImage);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
@ -917,6 +948,9 @@ private static void printUsage(String cmd) {
} else if ("-setBalancerBandwidth".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-setBalancerBandwidth <bandwidth in bytes per second>]");
} else if ("-fetchImage".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-fetchImage <local directory>]");
} else {
System.err.println("Usage: java DFSAdmin");
System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
@ -939,6 +973,7 @@ private static void printUsage(String cmd) {
System.err.println(" ["+SetSpaceQuotaCommand.USAGE+"]");
System.err.println(" ["+ClearSpaceQuotaCommand.USAGE+"]");
System.err.println(" [-setBalancerBandwidth <bandwidth in bytes per second>]");
System.err.println(" [-fetchImage <local directory>]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
@ -1035,6 +1070,11 @@ public int run(String[] argv) throws Exception {
printUsage(cmd);
return exitCode;
}
} else if ("-fetchImage".equals(cmd)) {
if (argv.length != 2) {
printUsage(cmd);
return exitCode;
}
}
// initialize DFSAdmin
@ -1089,6 +1129,8 @@ public int run(String[] argv) throws Exception {
exitCode = deleteBlockPool(argv, i);
} else if ("-setBalancerBandwidth".equals(cmd)) {
exitCode = setBalancerBandwidth(argv, i);
} else if ("-fetchImage".equals(cmd)) {
exitCode = fetchImage(argv, i);
} else if ("-help".equals(cmd)) {
if (i < argv.length) {
printHelp(argv[i]);

View File

@ -22,7 +22,6 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
@ -32,14 +31,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@ -226,20 +222,7 @@ private String getCurrentNamenodeAddress() throws IOException {
return null;
}
// force client address resolution.
fs.exists(new Path("/"));
// Derive the nameservice ID from the filesystem connection. The URI may
// have been provided by a human, the server name may be aliased, or there
// may be multiple possible actual addresses (e.g. in an HA setup) so
// compare InetSocketAddresses instead of URI strings, and test against both
// possible configurations of RPC address (DFS_NAMENODE_RPC_ADDRESS_KEY and
// DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY).
DistributedFileSystem dfs = (DistributedFileSystem) fs;
DFSClient dfsClient = dfs.getClient();
InetSocketAddress addr = RPC.getServerAddress(dfsClient.getNamenode());
return DFSUtil.getInfoServer(addr, conf, true);
return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf, true);
}
private int doWork(final String[] args) throws IOException {

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.junit.Test;
public class TestFetchImage {
private static File FETCHED_IMAGE_FILE = new File(
System.getProperty("build.test.dir"), "fetched-image-dir");
// Shamelessly stolen from NNStorage.
private static final Pattern IMAGE_REGEX = Pattern.compile("fsimage_(\\d+)");
/**
* Download a few fsimages using `hdfs dfsadmin -fetchImage ...' and verify
* the results.
*/
@Test
public void testFetchImage() throws Exception {
FETCHED_IMAGE_FILE.mkdirs();
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = null;
try {
DFSAdmin dfsAdmin = new DFSAdmin();
dfsAdmin.setConf(conf);
runFetchImage(dfsAdmin, cluster);
fs = cluster.getFileSystem();
fs.mkdirs(new Path("/foo"));
fs.mkdirs(new Path("/foo2"));
fs.mkdirs(new Path("/foo3"));
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().saveNamespace();
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
runFetchImage(dfsAdmin, cluster);
} finally {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Run `hdfs dfsadmin -fetchImage ...' and verify that the downloaded image is
* correct.
*/
private static void runFetchImage(DFSAdmin dfsAdmin, MiniDFSCluster cluster)
throws Exception {
int retVal = dfsAdmin.run(new String[]{"-fetchImage",
FETCHED_IMAGE_FILE.getPath() });
assertEquals(0, retVal);
File highestImageOnNn = getHighestFsImageOnCluster(cluster);
MD5Hash expected = MD5FileUtils.computeMd5ForFile(highestImageOnNn);
MD5Hash actual = MD5FileUtils.computeMd5ForFile(
new File(FETCHED_IMAGE_FILE, highestImageOnNn.getName()));
assertEquals(expected, actual);
}
/**
* @return the fsimage with highest transaction ID in the cluster.
*/
private static File getHighestFsImageOnCluster(MiniDFSCluster cluster) {
long highestImageTxId = -1;
File highestImageOnNn = null;
for (URI nameDir : cluster.getNameDirs(0)) {
for (File imageFile : new File(new File(nameDir), "current").listFiles()) {
Matcher imageMatch = IMAGE_REGEX.matcher(imageFile.getName());
if (imageMatch.matches()) {
long imageTxId = Long.valueOf(imageMatch.group(1));
if (imageTxId > highestImageTxId) {
highestImageTxId = imageTxId;
highestImageOnNn = imageFile;
}
}
}
}
return highestImageOnNn;
}
}