diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3476d08785..d389de68db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -122,6 +122,9 @@ Trunk (Unreleased) HDFS-5138. Support HDFS upgrade in HA. (atm via todd) + HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET + to send merged fsimages. (Vinayakumar B via wang) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8dc17fc371..378c2cd0ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -460,7 +460,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Image transfer timeout public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout"; - public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000; + public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000; + + // Image transfer chunksize + public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize"; + public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024; //Keys with no defaults public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java index 2571670693..e9387d7a0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; -import org.apache.hadoop.hdfs.server.namenode.GetImageServlet; +import org.apache.hadoop.hdfs.server.namenode.ImageServlet; import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -205,15 +205,16 @@ public void doGet(final HttpServletRequest request, return; } editFile = elf.getFile(); - GetImageServlet.setVerificationHeaders(response, editFile); - GetImageServlet.setFileNameHeaders(response, editFile); + ImageServlet.setVerificationHeadersForGet(response, editFile); + ImageServlet.setFileNameHeaders(response, editFile); editFileIn = new FileInputStream(editFile); } - DataTransferThrottler throttler = GetImageServlet.getThrottler(conf); + DataTransferThrottler throttler = ImageServlet.getThrottler(conf); // send edits - TransferFsImage.getFileServer(response, editFile, editFileIn, throttler); + TransferFsImage.copyFileToStream(response.getOutputStream(), editFile, + editFileIn, throttler); } catch (Throwable t) { String errMsg = "getedit failed. " + StringUtils.stringifyException(t); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java index ca2d07015f..c66574cdca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java @@ -263,8 +263,7 @@ void doCheckpoint() throws IOException { } if(cpCmd.needToReturnImage()) { - TransferFsImage.uploadImageFromStorage( - backupNode.nnHttpAddress, getImageListenAddress(), + TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf, bnStorage, NameNodeFile.IMAGE, txid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java similarity index 57% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java index 3c23443465..b1f127de0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java @@ -19,11 +19,10 @@ import static org.apache.hadoop.util.Time.now; +import java.net.HttpURLConnection; import java.security.PrivilegedExceptionAction; import java.util.*; import java.io.*; -import java.net.InetSocketAddress; -import java.net.URL; import javax.servlet.ServletContext; import javax.servlet.ServletException; @@ -32,7 +31,6 @@ import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,18 +55,21 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.net.InetAddresses; /** - * This class is used in Namesystem's jetty to retrieve a file. + * This class is used in Namesystem's jetty to retrieve/upload a file * Typically used by the Secondary NameNode to retrieve image and - * edit file for periodic checkpointing. + * edit file for periodic checkpointing in Non-HA deployments. + * Standby NameNode uses to upload checkpoints in HA deployments. */ @InterfaceAudience.Private -public class GetImageServlet extends HttpServlet { +public class ImageServlet extends HttpServlet { + + public static final String PATH_SPEC = "/imagetransfer"; + private static final long serialVersionUID = -7669068179452648952L; - private static final Log LOG = LogFactory.getLog(GetImageServlet.class); + private static final Log LOG = LogFactory.getLog(ImageServlet.class); public final static String CONTENT_DISPOSITION = "Content-Disposition"; public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name"; @@ -85,8 +86,7 @@ public class GetImageServlet extends HttpServlet { @Override public void doGet(final HttpServletRequest request, - final HttpServletResponse response - ) throws ServletException, IOException { + final HttpServletResponse response) throws ServletException, IOException { try { final ServletContext context = getServletContext(); final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context); @@ -94,29 +94,10 @@ public void doGet(final HttpServletRequest request, final Configuration conf = (Configuration) context .getAttribute(JspHelper.CURRENT_CONF); final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); - - if (UserGroupInformation.isSecurityEnabled() && - !isValidRequestor(context, request.getUserPrincipal().getName(), conf)) { - response.sendError(HttpServletResponse.SC_FORBIDDEN, - "Only Namenode, Secondary Namenode, and administrators may access " + - "this servlet"); - LOG.warn("Received non-NN/SNN/administrator request for image or edits from " - + request.getUserPrincipal().getName() + " at " + request.getRemoteHost()); - return; - } - - String myStorageInfoString = nnImage.getStorage().toColonSeparatedString(); - String theirStorageInfoString = parsedParams.getStorageInfoString(); - if (theirStorageInfoString != null && - !myStorageInfoString.equals(theirStorageInfoString)) { - response.sendError(HttpServletResponse.SC_FORBIDDEN, - "This namenode has storage info " + myStorageInfoString + - " but the secondary expected " + theirStorageInfoString); - LOG.warn("Received an invalid request file transfer request " + - "from a secondary with storage info " + theirStorageInfoString); - return; - } - + + validateRequest(context, conf, request, response, nnImage, + parsedParams.getStorageInfoString()); + UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { @@ -155,53 +136,6 @@ public Void run() throws Exception { long elapsed = now() - start; metrics.addGetEdit(elapsed); } - } else if (parsedParams.isPutImage()) { - final long txid = parsedParams.getTxId(); - final NameNodeFile nnf = parsedParams.getNameNodeFile(); - - if (! currentlyDownloadingCheckpoints.add(txid)) { - response.sendError(HttpServletResponse.SC_CONFLICT, - "Another checkpointer is already in the process of uploading a" + - " checkpoint made at transaction ID " + txid); - return null; - } - - try { - if (nnImage.getStorage().findImageFile(nnf, txid) != null) { - response.sendError(HttpServletResponse.SC_CONFLICT, - "Another checkpointer already uploaded an checkpoint " + - "for txid " + txid); - return null; - } - - // We may have lost our ticket since last checkpoint, log in again, just in case - if (UserGroupInformation.isSecurityEnabled()) { - UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); - } - - long start = now(); - // issue a HTTP get request to download the new fsimage - MD5Hash downloadImageDigest = TransferFsImage - .downloadImageToStorage(parsedParams.getInfoServer(conf), - txid, nnImage.getStorage(), true); - nnImage.saveDigestAndRenameCheckpointImage(nnf, txid, - downloadImageDigest); - if (nnf == NameNodeFile.IMAGE_ROLLBACK) { - NameNodeHttpServer.getNameNodeFromContext(context) - .getNamesystem().setCreatedRollbackImages(true); - } - - if (metrics != null) { // Metrics non-null only when used inside name node - long elapsed = now() - start; - metrics.addPutImage(elapsed); - } - - // Now that we have a new checkpoint, we might be able to - // remove some old ones. - nnImage.purgeOldStorage(nnf); - } finally { - currentlyDownloadingCheckpoints.remove(txid); - } } return null; } @@ -209,7 +143,7 @@ public Void run() throws Exception { private void serveFile(File file) throws IOException { FileInputStream fis = new FileInputStream(file); try { - setVerificationHeaders(response, file); + setVerificationHeadersForGet(response, file); setFileNameHeaders(response, file); if (!file.exists()) { // Potential race where the file was deleted while we were in the @@ -221,8 +155,8 @@ private void serveFile(File file) throws IOException { // detected by the client side as an inaccurate length header. } // send file - TransferFsImage.getFileServer(response, file, fis, - getThrottler(conf)); + TransferFsImage.copyFileToStream(response.getOutputStream(), + file, fis, getThrottler(conf)); } finally { IOUtils.closeStream(fis); } @@ -237,7 +171,36 @@ private void serveFile(File file) throws IOException { response.getOutputStream().close(); } } - + + private void validateRequest(ServletContext context, Configuration conf, + HttpServletRequest request, HttpServletResponse response, + FSImage nnImage, String theirStorageInfoString) throws IOException { + + if (UserGroupInformation.isSecurityEnabled() + && !isValidRequestor(context, request.getUserPrincipal().getName(), + conf)) { + String errorMsg = "Only Namenode, Secondary Namenode, and administrators may access " + + "this servlet"; + response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg); + LOG.warn("Received non-NN/SNN/administrator request for image or edits from " + + request.getUserPrincipal().getName() + + " at " + + request.getRemoteHost()); + throw new IOException(errorMsg); + } + + String myStorageInfoString = nnImage.getStorage().toColonSeparatedString(); + if (theirStorageInfoString != null + && !myStorageInfoString.equals(theirStorageInfoString)) { + String errorMsg = "This namenode has storage info " + myStorageInfoString + + " but the secondary expected " + theirStorageInfoString; + response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg); + LOG.warn("Received an invalid request file transfer request " + + "from a secondary with storage info " + theirStorageInfoString); + throw new IOException(errorMsg); + } + } + public static void setFileNameHeaders(HttpServletResponse response, File file) { response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" + @@ -264,43 +227,40 @@ public final static DataTransferThrottler getThrottler(Configuration conf) { @VisibleForTesting static boolean isValidRequestor(ServletContext context, String remoteUser, Configuration conf) throws IOException { - if(remoteUser == null) { // This really shouldn't happen... + if (remoteUser == null) { // This really shouldn't happen... LOG.warn("Received null remoteUser while authorizing access to getImage servlet"); return false; } - + Set validRequestors = new HashSet(); - validRequestors.add( - SecurityUtil.getServerPrincipal(conf - .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode - .getAddress(conf).getHostName())); - validRequestors.add( - SecurityUtil.getServerPrincipal(conf - .get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY), - SecondaryNameNode.getHttpAddress(conf).getHostName())); + validRequestors.add(SecurityUtil.getServerPrincipal(conf + .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), + NameNode.getAddress(conf).getHostName())); + validRequestors.add(SecurityUtil.getServerPrincipal( + conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY), + SecondaryNameNode.getHttpAddress(conf).getHostName())); if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) { Configuration otherNnConf = HAUtil.getConfForOtherNode(conf); - validRequestors.add( - SecurityUtil.getServerPrincipal(otherNnConf - .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), - NameNode.getAddress(otherNnConf).getHostName())); + validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf + .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), + NameNode.getAddress(otherNnConf).getHostName())); } - for(String v : validRequestors) { - if(v != null && v.equals(remoteUser)) { - LOG.info("GetImageServlet allowing checkpointer: " + remoteUser); + for (String v : validRequestors) { + if (v != null && v.equals(remoteUser)) { + LOG.info("ImageServlet allowing checkpointer: " + remoteUser); return true; } } - + if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) { - LOG.info("GetImageServlet allowing administrator: " + remoteUser); + LOG.info("ImageServlet allowing administrator: " + remoteUser); return true; } - - LOG.info("GetImageServlet rejecting: " + remoteUser); + + LOG.info("ImageServlet rejecting: " + remoteUser); return false; } @@ -308,8 +268,8 @@ static boolean isValidRequestor(ServletContext context, String remoteUser, * Set headers for content length, and, if available, md5. * @throws IOException */ - public static void setVerificationHeaders(HttpServletResponse response, File file) - throws IOException { + public static void setVerificationHeadersForGet(HttpServletResponse response, + File file) throws IOException { response.setHeader(TransferFsImage.CONTENT_LENGTH, String.valueOf(file.length())); MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file); @@ -339,30 +299,10 @@ static String getParamStringForLog(RemoteEditLog log, + "&" + STORAGEINFO_PARAM + "=" + remoteStorageInfo.toColonSeparatedString(); } - - static String getParamStringToPutImage(NameNodeFile nnf, long txid, - URL url, Storage storage) { - InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url - .getAuthority()); - String machine = !imageListenAddress.isUnresolved() - && imageListenAddress.getAddress().isAnyLocalAddress() ? null - : imageListenAddress.getHostName(); - return "putimage=1" + - "&" + TXID_PARAM + "=" + txid + - "&" + IMAGE_FILE_TYPE + "=" + nnf.name() + - "&port=" + imageListenAddress.getPort() + - (machine != null ? "&machine=" + machine : "") - + "&" + STORAGEINFO_PARAM + "=" + - storage.toColonSeparatedString(); - } - static class GetImageParams { private boolean isGetImage; private boolean isGetEdit; - private boolean isPutImage; - private int remoteport; - private String machineName; private NameNodeFile nnf; private long startTxId, endTxId, txId; private String storageInfoString; @@ -378,8 +318,7 @@ public GetImageParams(HttpServletRequest request, ) throws IOException { @SuppressWarnings("unchecked") Map pmap = request.getParameterMap(); - isGetImage = isGetEdit = isPutImage = fetchLatest = false; - remoteport = 0; + isGetImage = isGetEdit = fetchLatest = false; for (Map.Entry entry : pmap.entrySet()) { String key = entry.getKey(); @@ -402,30 +341,13 @@ public GetImageParams(HttpServletRequest request, isGetEdit = true; startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM); endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM); - } else if (key.equals("putimage")) { - isPutImage = true; - txId = ServletUtil.parseLongParam(request, TXID_PARAM); - String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE); - nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile - .valueOf(imageType); - } else if (key.equals("port")) { - remoteport = new Integer(val[0]).intValue(); - } else if (key.equals("machine")) { - machineName = val[0]; } else if (key.equals(STORAGEINFO_PARAM)) { storageInfoString = val[0]; } } - if (machineName == null) { - machineName = request.getRemoteHost(); - if (InetAddresses.isInetAddress(machineName)) { - machineName = NetUtils.getHostNameOfIP(machineName); - } - } - int numGets = (isGetImage?1:0) + (isGetEdit?1:0); - if ((numGets > 1) || (numGets == 0) && !isPutImage) { + if ((numGets > 1) || (numGets == 0)) { throw new IOException("Illegal parameters to TransferFsImage"); } } @@ -435,12 +357,12 @@ public String getStorageInfoString() { } public long getTxId() { - Preconditions.checkState(isGetImage || isPutImage); + Preconditions.checkState(isGetImage); return txId; } public NameNodeFile getNameNodeFile() { - Preconditions.checkState(isPutImage || isGetImage); + Preconditions.checkState(isGetImage); return nnf; } @@ -462,20 +384,161 @@ boolean isGetImage() { return isGetImage; } - boolean isPutImage() { - return isPutImage; - } - - URL getInfoServer(Configuration conf) throws IOException { - if (machineName == null || remoteport == 0) { - throw new IOException("MachineName and port undefined"); - } - return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, ""); - } - boolean shouldFetchLatest() { return fetchLatest; } } + + /** + * Set headers for image length and if available, md5. + * + * @throws IOException + */ + static void setVerificationHeadersForPut(HttpURLConnection connection, + File file) throws IOException { + connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH, + String.valueOf(file.length())); + MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file); + if (hash != null) { + connection + .setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString()); + } + } + + /** + * Set the required parameters for uploading image + * + * @param httpMethod instance of method to set the parameters + * @param storage colon separated storageInfo string + * @param txid txid of the image + * @param imageFileSize size of the imagefile to be uploaded + * @param nnf NameNodeFile Type + * @return Returns map of parameters to be used with PUT request. + */ + static Map getParamsForPutImage(Storage storage, long txid, + long imageFileSize, NameNodeFile nnf) { + Map params = new HashMap(); + params.put(TXID_PARAM, Long.toString(txid)); + params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString()); + // setting the length of the file to be uploaded in separate property as + // Content-Length only supports up to 2GB + params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize)); + params.put(IMAGE_FILE_TYPE, nnf.name()); + return params; + } + + @Override + protected void doPut(final HttpServletRequest request, + final HttpServletResponse response) throws ServletException, IOException { + try { + ServletContext context = getServletContext(); + final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context); + final Configuration conf = (Configuration) getServletContext() + .getAttribute(JspHelper.CURRENT_CONF); + final PutImageParams parsedParams = new PutImageParams(request, response, + conf); + final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); + + validateRequest(context, conf, request, response, nnImage, + parsedParams.getStorageInfoString()); + + UserGroupInformation.getCurrentUser().doAs( + new PrivilegedExceptionAction() { + + @Override + public Void run() throws Exception { + + final long txid = parsedParams.getTxId(); + + final NameNodeFile nnf = parsedParams.getNameNodeFile(); + + if (!currentlyDownloadingCheckpoints.add(txid)) { + response.sendError(HttpServletResponse.SC_CONFLICT, + "Another checkpointer is already in the process of uploading a" + + " checkpoint made at transaction ID " + txid); + return null; + } + try { + if (nnImage.getStorage().findImageFile(nnf, txid) != null) { + response.sendError(HttpServletResponse.SC_CONFLICT, + "Another checkpointer already uploaded an checkpoint " + + "for txid " + txid); + return null; + } + + InputStream stream = request.getInputStream(); + try { + long start = now(); + MD5Hash downloadImageDigest = TransferFsImage + .handleUploadImageRequest(request, txid, + nnImage.getStorage(), stream, + parsedParams.getFileSize(), getThrottler(conf)); + nnImage.saveDigestAndRenameCheckpointImage(nnf, txid, + downloadImageDigest); + // Metrics non-null only when used inside name node + if (metrics != null) { + long elapsed = now() - start; + metrics.addPutImage(elapsed); + } + // Now that we have a new checkpoint, we might be able to + // remove some old ones. + nnImage.purgeOldStorage(nnf); + } finally { + stream.close(); + } + } finally { + currentlyDownloadingCheckpoints.remove(txid); + } + return null; + } + + }); + } catch (Throwable t) { + String errMsg = "PutImage failed. " + StringUtils.stringifyException(t); + response.sendError(HttpServletResponse.SC_GONE, errMsg); + throw new IOException(errMsg); + } + } + + /* + * Params required to handle put image request + */ + static class PutImageParams { + private long txId = -1; + private String storageInfoString = null; + private long fileSize = 0L; + private NameNodeFile nnf; + + public PutImageParams(HttpServletRequest request, + HttpServletResponse response, Configuration conf) throws IOException { + txId = ServletUtil.parseLongParam(request, TXID_PARAM); + storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM); + fileSize = ServletUtil.parseLongParam(request, + TransferFsImage.FILE_LENGTH); + String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE); + nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile + .valueOf(imageType); + if (fileSize == 0 || txId == -1 || storageInfoString == null + || storageInfoString.isEmpty()) { + throw new IOException("Illegal parameters to TransferFsImage"); + } + } + + public long getTxId() { + return txId; + } + + public String getStorageInfoString() { + return storageInfoString; + } + + public long getFileSize() { + return fileSize; + } + + public NameNodeFile getNameNodeFile() { + return nnf; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java index 43952be5b6..89772005b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java @@ -233,8 +233,8 @@ private static void setupServlets(HttpServer2 httpServer, Configuration conf) { CancelDelegationTokenServlet.class, true); httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true); - httpServer.addInternalServlet("getimage", "/getimage", - GetImageServlet.class, true); + httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, + ImageServlet.class, true); httpServer.addInternalServlet("listPaths", "/listPaths/*", ListPathsServlet.class, false); httpServer.addInternalServlet("data", "/data/*", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index 1b5bf07ed6..a35d362a0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -114,7 +114,6 @@ public class SecondaryNameNode implements Runnable { private InetSocketAddress nameNodeAddr; private volatile boolean shouldRun; private HttpServer2 infoServer; - private URL imageListenURL; private Collection checkpointDirs; private List checkpointEditsDirs; @@ -267,13 +266,11 @@ private void initialize(final Configuration conf, infoServer.setAttribute("secondary.name.node", this); infoServer.setAttribute("name.system.image", checkpointImage); infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); - infoServer.addInternalServlet("getimage", "/getimage", - GetImageServlet.class, true); + infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, + ImageServlet.class, true); infoServer.start(); LOG.info("Web server init done"); - imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://" - + NetUtils.getHostPortString(infoServer.getConnectorAddress(0))); HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); int connIdx = 0; @@ -487,14 +484,6 @@ private URL getInfoServer() throws IOException { LOG.debug("Will connect to NameNode at " + address); return address.toURL(); } - - /** - * Return the host:port of where this SecondaryNameNode is listening - * for image transfers - */ - private URL getImageListenAddress() { - return imageListenURL; - } /** * Create a new checkpoint @@ -555,8 +544,8 @@ public boolean doCheckpoint() throws IOException { // to make this new uploaded image as the most current image. // long txid = checkpointImage.getLastAppliedTxId(); - TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(), - dstStorage, NameNodeFile.IMAGE, txid); + TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage, + NameNodeFile.IMAGE, txid); // error simulation code for junit test CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index ed0922f069..07870199d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -19,18 +19,22 @@ import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.HttpURLConnection; +import java.net.URISyntaxException; import java.net.URL; import java.security.DigestInputStream; import java.security.MessageDigest; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; -import javax.servlet.ServletOutputStream; -import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.logging.Log; @@ -49,10 +53,12 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.util.Time; +import org.apache.http.client.utils.URIBuilder; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -65,7 +71,12 @@ public class TransferFsImage { public final static String CONTENT_LENGTH = "Content-Length"; + public final static String FILE_LENGTH = "File-Length"; public final static String MD5_HEADER = "X-MD5-Digest"; + + private final static String CONTENT_TYPE = "Content-Type"; + private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; + @VisibleForTesting static int timeout = 0; private static URLConnectionFactory connectionFactory; @@ -82,14 +93,14 @@ public class TransferFsImage { public static void downloadMostRecentImageToDirectory(URL infoServer, File dir) throws IOException { - String fileId = GetImageServlet.getParamStringForMostRecentImage(); + String fileId = ImageServlet.getParamStringForMostRecentImage(); getFileClient(infoServer, fileId, Lists.newArrayList(dir), null, false); } public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId, Storage dstStorage, boolean needDigest) throws IOException { - String fileid = GetImageServlet.getParamStringForImage(null, + String fileid = ImageServlet.getParamStringForImage(null, imageTxId, dstStorage); String fileName = NNStorage.getCheckpointImageFileName(imageTxId); @@ -104,12 +115,31 @@ public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId, dstFiles.get(0).length() + " bytes."); return hash; } - + + static MD5Hash handleUploadImageRequest(HttpServletRequest request, + long imageTxId, Storage dstStorage, InputStream stream, + long advertisedSize, DataTransferThrottler throttler) throws IOException { + + String fileName = NNStorage.getCheckpointImageFileName(imageTxId); + + List dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName); + if (dstFiles.isEmpty()) { + throw new IOException("No targets in destination storage!"); + } + + MD5Hash advertisedDigest = parseMD5Header(request); + MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true, + advertisedSize, advertisedDigest, fileName, stream, throttler); + LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " + + dstFiles.get(0).length() + " bytes."); + return hash; + } + static void downloadEditsToStorage(URL fsName, RemoteEditLog log, NNStorage dstStorage) throws IOException { assert log.getStartTxId() > 0 && log.getEndTxId() > 0 : "bad log: " + log; - String fileid = GetImageServlet.getParamStringForLog( + String fileid = ImageServlet.getParamStringForLog( log, dstStorage); String finalFileName = NNStorage.getFinalizedEditsFileName( log.getStartTxId(), log.getEndTxId()); @@ -159,22 +189,19 @@ static void downloadEditsToStorage(URL fsName, RemoteEditLog log, * Requests that the NameNode download an image from this node. * * @param fsName the http address for the remote NN - * @param myNNAddress the host/port where the local node is running an - * HTTPServer hosting GetImageServlet + * @param conf Configuration * @param storage the storage directory to transfer the image from * @param nnf the NameNodeFile type of the image * @param txid the transaction ID of the image to be uploaded */ - public static void uploadImageFromStorage(URL fsName, URL myNNAddress, - Storage storage, NameNodeFile nnf, long txid) throws IOException { + public static void uploadImageFromStorage(URL fsName, Configuration conf, + NNStorage storage, NameNodeFile nnf, long txid) throws IOException { - String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid, - myNNAddress, storage); - // this doesn't directly upload an image, but rather asks the NN - // to connect back to the 2NN to download the specified image. + URL url = new URL(fsName, ImageServlet.PATH_SPEC); + long startTime = Time.monotonicNow(); try { - TransferFsImage.getFileClient(fsName, fileid, null, null, false); - } catch (HttpGetFailedException e) { + uploadImage(url, conf, storage, nnf, txid); + } catch (HttpPutFailedException e) { if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) { // this is OK - this means that a previous attempt to upload // this checkpoint succeeded even though we thought it failed. @@ -186,25 +213,105 @@ public static void uploadImageFromStorage(URL fsName, URL myNNAddress, throw e; } } - LOG.info("Uploaded image with txid " + txid + " to namenode at " + - fsName); + double xferSec = Math.max( + ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001); + LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName + + " in " + xferSec + " seconds"); + } + + /* + * Uploads the imagefile using HTTP PUT method + */ + private static void uploadImage(URL url, Configuration conf, + NNStorage storage, NameNodeFile nnf, long txId) throws IOException { + + File imageFile = storage.findImageFile(nnf, txId); + if (imageFile == null) { + throw new IOException("Could not find image with txid " + txId); + } + + HttpURLConnection connection = null; + try { + URIBuilder uriBuilder = new URIBuilder(url.toURI()); + + // write all params for image upload request as query itself. + // Request body contains the image to be uploaded. + Map params = ImageServlet.getParamsForPutImage(storage, + txId, imageFile.length(), nnf); + for (Entry entry : params.entrySet()) { + uriBuilder.addParameter(entry.getKey(), entry.getValue()); + } + + URL urlWithParams = uriBuilder.build().toURL(); + connection = (HttpURLConnection) connectionFactory.openConnection( + urlWithParams, UserGroupInformation.isSecurityEnabled()); + // Set the request to PUT + connection.setRequestMethod("PUT"); + connection.setDoOutput(true); + + + int chunkSize = conf.getInt( + DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY, + DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT); + if (imageFile.length() > chunkSize) { + // using chunked streaming mode to support upload of 2GB+ files and to + // avoid internal buffering. + // this mode should be used only if more than chunkSize data is present + // to upload. otherwise upload may not happen sometimes. + connection.setChunkedStreamingMode(chunkSize); + } + + setTimeout(connection); + + // set headers for verification + ImageServlet.setVerificationHeadersForPut(connection, imageFile); + + // Write the file to output stream. + writeFileToPutRequest(conf, connection, imageFile); + + int responseCode = connection.getResponseCode(); + if (responseCode != HttpURLConnection.HTTP_OK) { + throw new HttpPutFailedException(connection.getResponseMessage(), + responseCode); + } + } catch (AuthenticationException e) { + throw new IOException(e); + } catch (URISyntaxException e) { + throw new IOException(e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + private static void writeFileToPutRequest(Configuration conf, + HttpURLConnection connection, File imageFile) + throws FileNotFoundException, IOException { + connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream"); + connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary"); + OutputStream output = connection.getOutputStream(); + FileInputStream input = new FileInputStream(imageFile); + try { + copyFileToStream(output, imageFile, input, + ImageServlet.getThrottler(conf)); + } finally { + IOUtils.closeStream(input); + IOUtils.closeStream(output); + } } - /** * A server-side method to respond to a getfile http request * Copies the contents of the local file into the output stream. */ - public static void getFileServer(ServletResponse response, File localfile, - FileInputStream infile, - DataTransferThrottler throttler) + public static void copyFileToStream(OutputStream out, File localfile, + FileInputStream infile, DataTransferThrottler throttler) throws IOException { byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; - ServletOutputStream out = null; try { CheckpointFaultInjector.getInstance() .aboutToSendFile(localfile); - out = response.getOutputStream(); if (CheckpointFaultInjector.getInstance(). shouldSendShortFile(localfile)) { @@ -250,14 +357,13 @@ public static void getFileServer(ServletResponse response, File localfile, static MD5Hash getFileClient(URL infoServer, String queryString, List localPaths, Storage dstStorage, boolean getChecksum) throws IOException { - URL url = new URL(infoServer, "/getimage?" + queryString); + URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString); LOG.info("Opening connection to " + url); return doGetUrl(url, localPaths, dstStorage, getChecksum); } public static MD5Hash doGetUrl(URL url, List localPaths, Storage dstStorage, boolean getChecksum) throws IOException { - long startTime = Time.monotonicNow(); HttpURLConnection connection; try { connection = (HttpURLConnection) @@ -266,16 +372,7 @@ public static MD5Hash doGetUrl(URL url, List localPaths, throw new IOException(e); } - if (timeout <= 0) { - Configuration conf = new HdfsConfiguration(); - timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY, - DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT); - } - - if (timeout > 0) { - connection.setConnectTimeout(timeout); - connection.setReadTimeout(timeout); - } + setTimeout(connection); if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { throw new HttpGetFailedException( @@ -293,10 +390,37 @@ public static MD5Hash doGetUrl(URL url, List localPaths, throw new IOException(CONTENT_LENGTH + " header is not provided " + "by the namenode when trying to fetch " + url); } - + MD5Hash advertisedDigest = parseMD5Header(connection); + String fsImageName = connection + .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER); + InputStream stream = connection.getInputStream(); + + return receiveFile(url.toExternalForm(), localPaths, dstStorage, + getChecksum, advertisedSize, advertisedDigest, fsImageName, stream, + null); + } + + private static void setTimeout(HttpURLConnection connection) { + if (timeout <= 0) { + Configuration conf = new HdfsConfiguration(); + timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY, + DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT); + LOG.info("Image Transfer timeout configured to " + timeout + + " milliseconds"); + } + + if (timeout > 0) { + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + } + } + + private static MD5Hash receiveFile(String url, List localPaths, + Storage dstStorage, boolean getChecksum, long advertisedSize, + MD5Hash advertisedDigest, String fsImageName, InputStream stream, + DataTransferThrottler throttler) throws IOException { + long startTime = Time.monotonicNow(); if (localPaths != null) { - String fsImageName = connection.getHeaderField( - GetImageServlet.HADOOP_IMAGE_EDITS_HEADER); // If the local paths refer to directories, use the server-provided header // as the filename within that directory List newLocalPaths = new ArrayList(); @@ -313,10 +437,8 @@ public static MD5Hash doGetUrl(URL url, List localPaths, localPaths = newLocalPaths; } - MD5Hash advertisedDigest = parseMD5Header(connection); long received = 0; - InputStream stream = connection.getInputStream(); MessageDigest digester = null; if (getChecksum) { digester = MD5Hash.getDigester(); @@ -361,6 +483,9 @@ public static MD5Hash doGetUrl(URL url, List localPaths, for (FileOutputStream fos : outputStreams) { fos.write(buf, 0, num); } + if (throttler != null) { + throttler.throttle(num); + } } } finishedReceiving = true; @@ -404,7 +529,12 @@ private static MD5Hash parseMD5Header(HttpURLConnection connection) { String header = connection.getHeaderField(MD5_HEADER); return (header != null) ? new MD5Hash(header) : null; } - + + private static MD5Hash parseMD5Header(HttpServletRequest request) { + String header = request.getHeader(MD5_HEADER); + return (header != null) ? new MD5Hash(header) : null; + } + public static class HttpGetFailedException extends IOException { private static final long serialVersionUID = 1L; private final int responseCode; @@ -419,4 +549,18 @@ public int getResponseCode() { } } + public static class HttpPutFailedException extends IOException { + private static final long serialVersionUID = 1L; + private final int responseCode; + + HttpPutFailedException(String msg, int responseCode) throws IOException { + super(msg); + this.responseCode = responseCode; + } + + public int getResponseCode() { + return responseCode; + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index 7aa6077a8b..a80c877472 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -63,6 +63,7 @@ public class StandbyCheckpointer { private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class); private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L; private final CheckpointConf checkpointConf; + private final Configuration conf; private final FSNamesystem namesystem; private long lastCheckpointTime; private final CheckpointerThread thread; @@ -80,6 +81,7 @@ public class StandbyCheckpointer { public StandbyCheckpointer(Configuration conf, FSNamesystem ns) throws IOException { this.namesystem = ns; + this.conf = conf; this.checkpointConf = new CheckpointConf(conf); this.thread = new CheckpointerThread(); this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true) @@ -193,7 +195,7 @@ private void doCheckpoint() throws InterruptedException, IOException { Future upload = executor.submit(new Callable() { @Override public Void call() throws IOException { - TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress, + TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 90e72357bf..ca5e4b89a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -858,15 +858,13 @@ dfs.image.transfer.timeout - 600000 + 60000 - Timeout for image transfer in milliseconds. This timeout and the related + Socket timeout for image transfer in milliseconds. This timeout and the related dfs.image.transfer.bandwidthPerSec parameter should be configured such - that normal image transfer can complete within the timeout. + that normal image transfer can complete successfully. This timeout prevents client hangs when the sender fails during - image transfer, which is particularly important during checkpointing. - Note that this timeout applies to the entirety of image transfer, and - is not a socket timeout. + image transfer. This is socket timeout during image tranfer. @@ -883,6 +881,16 @@ + + dfs.image.transfer.chunksize + 65536 + + Chunksize in bytes to upload the checkpoint. + Chunked streaming is used to avoid internal buffering of contents + of image file of huge size. + + + dfs.namenode.support.allow.format true diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java index 735a1fa907..ca8e4b6540 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java @@ -31,6 +31,7 @@ import static org.junit.Assert.fail; import java.io.File; +import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.RandomAccessFile; @@ -554,23 +555,9 @@ public void testSecondaryNamenodeError3() throws IOException { } /** - * Simulate a secondary node failure to transfer image - * back to the name-node. - * Used to truncate primary fsimage file. - */ - @Test - public void testSecondaryFailsToReturnImage() throws IOException { - Mockito.doThrow(new IOException("If this exception is not caught by the " + - "name-node, fs image will be truncated.")) - .when(faultInjector).aboutToSendFile(filePathContaining("secondary")); - - doSecondaryFailsToReturnImage(); - } - - /** - * Similar to above test, but uses an unchecked Error, and causes it - * before even setting the length header. This used to cause image - * truncation. Regression test for HDFS-3330. + * Simulate a secondary node failure to transfer image. Uses an unchecked + * error and fail transfer before even setting the length header. This used to + * cause image truncation. Regression test for HDFS-3330. */ @Test public void testSecondaryFailsWithErrorBeforeSettingHeaders() @@ -1975,7 +1962,14 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException { Mockito.doReturn(Lists.newArrayList(new File("/wont-be-written"))) .when(dstImage).getFiles( Mockito.anyObject(), Mockito.anyString()); - + + File mockImageFile = File.createTempFile("image", ""); + FileOutputStream imageFile = new FileOutputStream(mockImageFile); + imageFile.write("data".getBytes()); + imageFile.close(); + Mockito.doReturn(mockImageFile).when(dstImage) + .findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong()); + Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString()) .when(dstImage).toColonSeparatedString(); @@ -1996,8 +1990,8 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException { } try { - TransferFsImage.uploadImageFromStorage(fsName, new URL( - "http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0); + TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage, + NameNodeFile.IMAGE, 0); fail("Storage info was not verified"); } catch (IOException ioe) { String msg = StringUtils.stringifyException(ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java index bffa54f6f2..5249d9fe3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java @@ -69,7 +69,7 @@ public void testIsValidRequestor() throws IOException { Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls); // Make sure that NN2 is considered a valid fsimage/edits requestor. - assertTrue(GetImageServlet.isValidRequestor(context, + assertTrue(ImageServlet.isValidRequestor(context, "hdfs/host2@TEST-REALM.COM", conf)); // Mark atm as an admin. @@ -81,15 +81,15 @@ public boolean matches(Object argument) { }))).thenReturn(true); // Make sure that NN2 is still considered a valid requestor. - assertTrue(GetImageServlet.isValidRequestor(context, + assertTrue(ImageServlet.isValidRequestor(context, "hdfs/host2@TEST-REALM.COM", conf)); // Make sure an admin is considered a valid requestor. - assertTrue(GetImageServlet.isValidRequestor(context, + assertTrue(ImageServlet.isValidRequestor(context, "atm@TEST-REALM.COM", conf)); // Make sure other users are *not* considered valid requestors. - assertFalse(GetImageServlet.isValidRequestor(context, + assertFalse(ImageServlet.isValidRequestor(context, "todd@TEST-REALM.COM", conf)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java index 14d4441b4c..fd03759c8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java @@ -22,6 +22,7 @@ import static org.junit.Assert.fail; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.SocketTimeoutException; import java.net.URL; @@ -34,9 +35,11 @@ import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.HttpServerFunctionalTest; import org.apache.hadoop.test.PathUtils; @@ -118,10 +121,11 @@ public void testClientSideExceptionOnJustOneDir() throws IOException { * Test to verify the read timeout */ @Test(timeout = 5000) - public void testImageTransferTimeout() throws Exception { + public void testGetImageTimeout() throws Exception { HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs"); try { - testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class); + testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC, + TestImageTransferServlet.class); testServer.start(); URL serverURL = HttpServerFunctionalTest.getServerURL(testServer); TransferFsImage.timeout = 2000; @@ -139,7 +143,48 @@ public void testImageTransferTimeout() throws Exception { } } - public static class TestGetImageServlet extends HttpServlet { + /** + * Test to verify the timeout of Image upload + */ + @Test(timeout = 10000) + public void testImageUploadTimeout() throws Exception { + Configuration conf = new HdfsConfiguration(); + NNStorage mockStorage = Mockito.mock(NNStorage.class); + HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs"); + try { + testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC, + TestImageTransferServlet.class); + testServer.start(); + URL serverURL = HttpServerFunctionalTest.getServerURL(testServer); + // set the timeout here, otherwise it will take default. + TransferFsImage.timeout = 2000; + + File tmpDir = new File(new FileSystemTestHelper().getTestRootDir()); + tmpDir.mkdirs(); + + File mockImageFile = File.createTempFile("image", "", tmpDir); + FileOutputStream imageFile = new FileOutputStream(mockImageFile); + imageFile.write("data".getBytes()); + imageFile.close(); + Mockito.when( + mockStorage.findImageFile(Mockito.any(NameNodeFile.class), + Mockito.anyLong())).thenReturn(mockImageFile); + Mockito.when(mockStorage.toColonSeparatedString()).thenReturn( + "storage:info:string"); + + try { + TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage, + NameNodeFile.IMAGE, 1L); + fail("TransferImage Should fail with timeout"); + } catch (SocketTimeoutException e) { + assertEquals("Upload should timeout", "Read timed out", e.getMessage()); + } + } finally { + testServer.stop(); + } + } + + public static class TestImageTransferServlet extends HttpServlet { private static final long serialVersionUID = 1L; @Override @@ -153,5 +198,17 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) } } } + + @Override + protected void doPut(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + synchronized (this) { + try { + wait(5000); + } catch (InterruptedException e) { + // Ignore + } + } + } } }