HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET to send merged fsimages. Contributed by Vinayakumar B.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575611 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dbd22b23c2
commit
94a1632fcb
@ -122,6 +122,9 @@ Trunk (Unreleased)
|
||||
|
||||
HDFS-5138. Support HDFS upgrade in HA. (atm via todd)
|
||||
|
||||
HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET
|
||||
to send merged fsimages. (Vinayakumar B via wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -460,7 +460,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
|
||||
// Image transfer timeout
|
||||
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
|
||||
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000;
|
||||
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
|
||||
|
||||
// Image transfer chunksize
|
||||
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
|
||||
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
|
||||
|
||||
//Keys with no defaults
|
||||
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
|
||||
|
@ -43,7 +43,7 @@
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
@ -205,15 +205,16 @@ public void doGet(final HttpServletRequest request,
|
||||
return;
|
||||
}
|
||||
editFile = elf.getFile();
|
||||
GetImageServlet.setVerificationHeaders(response, editFile);
|
||||
GetImageServlet.setFileNameHeaders(response, editFile);
|
||||
ImageServlet.setVerificationHeadersForGet(response, editFile);
|
||||
ImageServlet.setFileNameHeaders(response, editFile);
|
||||
editFileIn = new FileInputStream(editFile);
|
||||
}
|
||||
|
||||
DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
|
||||
DataTransferThrottler throttler = ImageServlet.getThrottler(conf);
|
||||
|
||||
// send edits
|
||||
TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
|
||||
TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
|
||||
editFileIn, throttler);
|
||||
|
||||
} catch (Throwable t) {
|
||||
String errMsg = "getedit failed. " + StringUtils.stringifyException(t);
|
||||
|
@ -263,8 +263,7 @@ void doCheckpoint() throws IOException {
|
||||
}
|
||||
|
||||
if(cpCmd.needToReturnImage()) {
|
||||
TransferFsImage.uploadImageFromStorage(
|
||||
backupNode.nnHttpAddress, getImageListenAddress(),
|
||||
TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
|
||||
bnStorage, NameNodeFile.IMAGE, txid);
|
||||
}
|
||||
|
||||
|
@ -19,11 +19,10 @@
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
import java.net.HttpURLConnection;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.*;
|
||||
import java.io.*;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletException;
|
||||
@ -32,7 +31,6 @@
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -57,18 +55,21 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.net.InetAddresses;
|
||||
|
||||
/**
|
||||
* This class is used in Namesystem's jetty to retrieve a file.
|
||||
* This class is used in Namesystem's jetty to retrieve/upload a file
|
||||
* Typically used by the Secondary NameNode to retrieve image and
|
||||
* edit file for periodic checkpointing.
|
||||
* edit file for periodic checkpointing in Non-HA deployments.
|
||||
* Standby NameNode uses to upload checkpoints in HA deployments.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class GetImageServlet extends HttpServlet {
|
||||
public class ImageServlet extends HttpServlet {
|
||||
|
||||
public static final String PATH_SPEC = "/imagetransfer";
|
||||
|
||||
private static final long serialVersionUID = -7669068179452648952L;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
|
||||
private static final Log LOG = LogFactory.getLog(ImageServlet.class);
|
||||
|
||||
public final static String CONTENT_DISPOSITION = "Content-Disposition";
|
||||
public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
|
||||
@ -85,8 +86,7 @@ public class GetImageServlet extends HttpServlet {
|
||||
|
||||
@Override
|
||||
public void doGet(final HttpServletRequest request,
|
||||
final HttpServletResponse response
|
||||
) throws ServletException, IOException {
|
||||
final HttpServletResponse response) throws ServletException, IOException {
|
||||
try {
|
||||
final ServletContext context = getServletContext();
|
||||
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
|
||||
@ -94,29 +94,10 @@ public void doGet(final HttpServletRequest request,
|
||||
final Configuration conf = (Configuration) context
|
||||
.getAttribute(JspHelper.CURRENT_CONF);
|
||||
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled() &&
|
||||
!isValidRequestor(context, request.getUserPrincipal().getName(), conf)) {
|
||||
response.sendError(HttpServletResponse.SC_FORBIDDEN,
|
||||
"Only Namenode, Secondary Namenode, and administrators may access " +
|
||||
"this servlet");
|
||||
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
|
||||
+ request.getUserPrincipal().getName() + " at " + request.getRemoteHost());
|
||||
return;
|
||||
}
|
||||
|
||||
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
|
||||
String theirStorageInfoString = parsedParams.getStorageInfoString();
|
||||
if (theirStorageInfoString != null &&
|
||||
!myStorageInfoString.equals(theirStorageInfoString)) {
|
||||
response.sendError(HttpServletResponse.SC_FORBIDDEN,
|
||||
"This namenode has storage info " + myStorageInfoString +
|
||||
" but the secondary expected " + theirStorageInfoString);
|
||||
LOG.warn("Received an invalid request file transfer request " +
|
||||
"from a secondary with storage info " + theirStorageInfoString);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
validateRequest(context, conf, request, response, nnImage,
|
||||
parsedParams.getStorageInfoString());
|
||||
|
||||
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
@ -155,53 +136,6 @@ public Void run() throws Exception {
|
||||
long elapsed = now() - start;
|
||||
metrics.addGetEdit(elapsed);
|
||||
}
|
||||
} else if (parsedParams.isPutImage()) {
|
||||
final long txid = parsedParams.getTxId();
|
||||
final NameNodeFile nnf = parsedParams.getNameNodeFile();
|
||||
|
||||
if (! currentlyDownloadingCheckpoints.add(txid)) {
|
||||
response.sendError(HttpServletResponse.SC_CONFLICT,
|
||||
"Another checkpointer is already in the process of uploading a" +
|
||||
" checkpoint made at transaction ID " + txid);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
|
||||
response.sendError(HttpServletResponse.SC_CONFLICT,
|
||||
"Another checkpointer already uploaded an checkpoint " +
|
||||
"for txid " + txid);
|
||||
return null;
|
||||
}
|
||||
|
||||
// We may have lost our ticket since last checkpoint, log in again, just in case
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
|
||||
}
|
||||
|
||||
long start = now();
|
||||
// issue a HTTP get request to download the new fsimage
|
||||
MD5Hash downloadImageDigest = TransferFsImage
|
||||
.downloadImageToStorage(parsedParams.getInfoServer(conf),
|
||||
txid, nnImage.getStorage(), true);
|
||||
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
|
||||
downloadImageDigest);
|
||||
if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
|
||||
NameNodeHttpServer.getNameNodeFromContext(context)
|
||||
.getNamesystem().setCreatedRollbackImages(true);
|
||||
}
|
||||
|
||||
if (metrics != null) { // Metrics non-null only when used inside name node
|
||||
long elapsed = now() - start;
|
||||
metrics.addPutImage(elapsed);
|
||||
}
|
||||
|
||||
// Now that we have a new checkpoint, we might be able to
|
||||
// remove some old ones.
|
||||
nnImage.purgeOldStorage(nnf);
|
||||
} finally {
|
||||
currentlyDownloadingCheckpoints.remove(txid);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -209,7 +143,7 @@ public Void run() throws Exception {
|
||||
private void serveFile(File file) throws IOException {
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
try {
|
||||
setVerificationHeaders(response, file);
|
||||
setVerificationHeadersForGet(response, file);
|
||||
setFileNameHeaders(response, file);
|
||||
if (!file.exists()) {
|
||||
// Potential race where the file was deleted while we were in the
|
||||
@ -221,8 +155,8 @@ private void serveFile(File file) throws IOException {
|
||||
// detected by the client side as an inaccurate length header.
|
||||
}
|
||||
// send file
|
||||
TransferFsImage.getFileServer(response, file, fis,
|
||||
getThrottler(conf));
|
||||
TransferFsImage.copyFileToStream(response.getOutputStream(),
|
||||
file, fis, getThrottler(conf));
|
||||
} finally {
|
||||
IOUtils.closeStream(fis);
|
||||
}
|
||||
@ -237,7 +171,36 @@ private void serveFile(File file) throws IOException {
|
||||
response.getOutputStream().close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void validateRequest(ServletContext context, Configuration conf,
|
||||
HttpServletRequest request, HttpServletResponse response,
|
||||
FSImage nnImage, String theirStorageInfoString) throws IOException {
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()
|
||||
&& !isValidRequestor(context, request.getUserPrincipal().getName(),
|
||||
conf)) {
|
||||
String errorMsg = "Only Namenode, Secondary Namenode, and administrators may access "
|
||||
+ "this servlet";
|
||||
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
|
||||
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
|
||||
+ request.getUserPrincipal().getName()
|
||||
+ " at "
|
||||
+ request.getRemoteHost());
|
||||
throw new IOException(errorMsg);
|
||||
}
|
||||
|
||||
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
|
||||
if (theirStorageInfoString != null
|
||||
&& !myStorageInfoString.equals(theirStorageInfoString)) {
|
||||
String errorMsg = "This namenode has storage info " + myStorageInfoString
|
||||
+ " but the secondary expected " + theirStorageInfoString;
|
||||
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
|
||||
LOG.warn("Received an invalid request file transfer request "
|
||||
+ "from a secondary with storage info " + theirStorageInfoString);
|
||||
throw new IOException(errorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
public static void setFileNameHeaders(HttpServletResponse response,
|
||||
File file) {
|
||||
response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
|
||||
@ -264,43 +227,40 @@ public final static DataTransferThrottler getThrottler(Configuration conf) {
|
||||
@VisibleForTesting
|
||||
static boolean isValidRequestor(ServletContext context, String remoteUser,
|
||||
Configuration conf) throws IOException {
|
||||
if(remoteUser == null) { // This really shouldn't happen...
|
||||
if (remoteUser == null) { // This really shouldn't happen...
|
||||
LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
Set<String> validRequestors = new HashSet<String>();
|
||||
|
||||
validRequestors.add(
|
||||
SecurityUtil.getServerPrincipal(conf
|
||||
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
|
||||
.getAddress(conf).getHostName()));
|
||||
validRequestors.add(
|
||||
SecurityUtil.getServerPrincipal(conf
|
||||
.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
|
||||
SecondaryNameNode.getHttpAddress(conf).getHostName()));
|
||||
validRequestors.add(SecurityUtil.getServerPrincipal(conf
|
||||
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
|
||||
NameNode.getAddress(conf).getHostName()));
|
||||
validRequestors.add(SecurityUtil.getServerPrincipal(
|
||||
conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
|
||||
SecondaryNameNode.getHttpAddress(conf).getHostName()));
|
||||
|
||||
if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
|
||||
Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
|
||||
validRequestors.add(
|
||||
SecurityUtil.getServerPrincipal(otherNnConf
|
||||
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
|
||||
NameNode.getAddress(otherNnConf).getHostName()));
|
||||
validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
|
||||
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
|
||||
NameNode.getAddress(otherNnConf).getHostName()));
|
||||
}
|
||||
|
||||
for(String v : validRequestors) {
|
||||
if(v != null && v.equals(remoteUser)) {
|
||||
LOG.info("GetImageServlet allowing checkpointer: " + remoteUser);
|
||||
for (String v : validRequestors) {
|
||||
if (v != null && v.equals(remoteUser)) {
|
||||
LOG.info("ImageServlet allowing checkpointer: " + remoteUser);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) {
|
||||
LOG.info("GetImageServlet allowing administrator: " + remoteUser);
|
||||
LOG.info("ImageServlet allowing administrator: " + remoteUser);
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG.info("GetImageServlet rejecting: " + remoteUser);
|
||||
|
||||
LOG.info("ImageServlet rejecting: " + remoteUser);
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -308,8 +268,8 @@ static boolean isValidRequestor(ServletContext context, String remoteUser,
|
||||
* Set headers for content length, and, if available, md5.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void setVerificationHeaders(HttpServletResponse response, File file)
|
||||
throws IOException {
|
||||
public static void setVerificationHeadersForGet(HttpServletResponse response,
|
||||
File file) throws IOException {
|
||||
response.setHeader(TransferFsImage.CONTENT_LENGTH,
|
||||
String.valueOf(file.length()));
|
||||
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
|
||||
@ -339,30 +299,10 @@ static String getParamStringForLog(RemoteEditLog log,
|
||||
+ "&" + STORAGEINFO_PARAM + "=" +
|
||||
remoteStorageInfo.toColonSeparatedString();
|
||||
}
|
||||
|
||||
static String getParamStringToPutImage(NameNodeFile nnf, long txid,
|
||||
URL url, Storage storage) {
|
||||
InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url
|
||||
.getAuthority());
|
||||
String machine = !imageListenAddress.isUnresolved()
|
||||
&& imageListenAddress.getAddress().isAnyLocalAddress() ? null
|
||||
: imageListenAddress.getHostName();
|
||||
return "putimage=1" +
|
||||
"&" + TXID_PARAM + "=" + txid +
|
||||
"&" + IMAGE_FILE_TYPE + "=" + nnf.name() +
|
||||
"&port=" + imageListenAddress.getPort() +
|
||||
(machine != null ? "&machine=" + machine : "")
|
||||
+ "&" + STORAGEINFO_PARAM + "=" +
|
||||
storage.toColonSeparatedString();
|
||||
}
|
||||
|
||||
|
||||
static class GetImageParams {
|
||||
private boolean isGetImage;
|
||||
private boolean isGetEdit;
|
||||
private boolean isPutImage;
|
||||
private int remoteport;
|
||||
private String machineName;
|
||||
private NameNodeFile nnf;
|
||||
private long startTxId, endTxId, txId;
|
||||
private String storageInfoString;
|
||||
@ -378,8 +318,7 @@ public GetImageParams(HttpServletRequest request,
|
||||
) throws IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, String[]> pmap = request.getParameterMap();
|
||||
isGetImage = isGetEdit = isPutImage = fetchLatest = false;
|
||||
remoteport = 0;
|
||||
isGetImage = isGetEdit = fetchLatest = false;
|
||||
|
||||
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
@ -402,30 +341,13 @@ public GetImageParams(HttpServletRequest request,
|
||||
isGetEdit = true;
|
||||
startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM);
|
||||
endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM);
|
||||
} else if (key.equals("putimage")) {
|
||||
isPutImage = true;
|
||||
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
|
||||
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
|
||||
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
|
||||
.valueOf(imageType);
|
||||
} else if (key.equals("port")) {
|
||||
remoteport = new Integer(val[0]).intValue();
|
||||
} else if (key.equals("machine")) {
|
||||
machineName = val[0];
|
||||
} else if (key.equals(STORAGEINFO_PARAM)) {
|
||||
storageInfoString = val[0];
|
||||
}
|
||||
}
|
||||
|
||||
if (machineName == null) {
|
||||
machineName = request.getRemoteHost();
|
||||
if (InetAddresses.isInetAddress(machineName)) {
|
||||
machineName = NetUtils.getHostNameOfIP(machineName);
|
||||
}
|
||||
}
|
||||
|
||||
int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
|
||||
if ((numGets > 1) || (numGets == 0) && !isPutImage) {
|
||||
if ((numGets > 1) || (numGets == 0)) {
|
||||
throw new IOException("Illegal parameters to TransferFsImage");
|
||||
}
|
||||
}
|
||||
@ -435,12 +357,12 @@ public String getStorageInfoString() {
|
||||
}
|
||||
|
||||
public long getTxId() {
|
||||
Preconditions.checkState(isGetImage || isPutImage);
|
||||
Preconditions.checkState(isGetImage);
|
||||
return txId;
|
||||
}
|
||||
|
||||
public NameNodeFile getNameNodeFile() {
|
||||
Preconditions.checkState(isPutImage || isGetImage);
|
||||
Preconditions.checkState(isGetImage);
|
||||
return nnf;
|
||||
}
|
||||
|
||||
@ -462,20 +384,161 @@ boolean isGetImage() {
|
||||
return isGetImage;
|
||||
}
|
||||
|
||||
boolean isPutImage() {
|
||||
return isPutImage;
|
||||
}
|
||||
|
||||
URL getInfoServer(Configuration conf) throws IOException {
|
||||
if (machineName == null || remoteport == 0) {
|
||||
throw new IOException("MachineName and port undefined");
|
||||
}
|
||||
return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, "");
|
||||
}
|
||||
|
||||
boolean shouldFetchLatest() {
|
||||
return fetchLatest;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Set headers for image length and if available, md5.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
static void setVerificationHeadersForPut(HttpURLConnection connection,
|
||||
File file) throws IOException {
|
||||
connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
|
||||
String.valueOf(file.length()));
|
||||
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
|
||||
if (hash != null) {
|
||||
connection
|
||||
.setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the required parameters for uploading image
|
||||
*
|
||||
* @param httpMethod instance of method to set the parameters
|
||||
* @param storage colon separated storageInfo string
|
||||
* @param txid txid of the image
|
||||
* @param imageFileSize size of the imagefile to be uploaded
|
||||
* @param nnf NameNodeFile Type
|
||||
* @return Returns map of parameters to be used with PUT request.
|
||||
*/
|
||||
static Map<String, String> getParamsForPutImage(Storage storage, long txid,
|
||||
long imageFileSize, NameNodeFile nnf) {
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
params.put(TXID_PARAM, Long.toString(txid));
|
||||
params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
|
||||
// setting the length of the file to be uploaded in separate property as
|
||||
// Content-Length only supports up to 2GB
|
||||
params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
|
||||
params.put(IMAGE_FILE_TYPE, nnf.name());
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPut(final HttpServletRequest request,
|
||||
final HttpServletResponse response) throws ServletException, IOException {
|
||||
try {
|
||||
ServletContext context = getServletContext();
|
||||
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
|
||||
final Configuration conf = (Configuration) getServletContext()
|
||||
.getAttribute(JspHelper.CURRENT_CONF);
|
||||
final PutImageParams parsedParams = new PutImageParams(request, response,
|
||||
conf);
|
||||
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
||||
|
||||
validateRequest(context, conf, request, response, nnImage,
|
||||
parsedParams.getStorageInfoString());
|
||||
|
||||
UserGroupInformation.getCurrentUser().doAs(
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
|
||||
final long txid = parsedParams.getTxId();
|
||||
|
||||
final NameNodeFile nnf = parsedParams.getNameNodeFile();
|
||||
|
||||
if (!currentlyDownloadingCheckpoints.add(txid)) {
|
||||
response.sendError(HttpServletResponse.SC_CONFLICT,
|
||||
"Another checkpointer is already in the process of uploading a"
|
||||
+ " checkpoint made at transaction ID " + txid);
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
|
||||
response.sendError(HttpServletResponse.SC_CONFLICT,
|
||||
"Another checkpointer already uploaded an checkpoint "
|
||||
+ "for txid " + txid);
|
||||
return null;
|
||||
}
|
||||
|
||||
InputStream stream = request.getInputStream();
|
||||
try {
|
||||
long start = now();
|
||||
MD5Hash downloadImageDigest = TransferFsImage
|
||||
.handleUploadImageRequest(request, txid,
|
||||
nnImage.getStorage(), stream,
|
||||
parsedParams.getFileSize(), getThrottler(conf));
|
||||
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
|
||||
downloadImageDigest);
|
||||
// Metrics non-null only when used inside name node
|
||||
if (metrics != null) {
|
||||
long elapsed = now() - start;
|
||||
metrics.addPutImage(elapsed);
|
||||
}
|
||||
// Now that we have a new checkpoint, we might be able to
|
||||
// remove some old ones.
|
||||
nnImage.purgeOldStorage(nnf);
|
||||
} finally {
|
||||
stream.close();
|
||||
}
|
||||
} finally {
|
||||
currentlyDownloadingCheckpoints.remove(txid);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
String errMsg = "PutImage failed. " + StringUtils.stringifyException(t);
|
||||
response.sendError(HttpServletResponse.SC_GONE, errMsg);
|
||||
throw new IOException(errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Params required to handle put image request
|
||||
*/
|
||||
static class PutImageParams {
|
||||
private long txId = -1;
|
||||
private String storageInfoString = null;
|
||||
private long fileSize = 0L;
|
||||
private NameNodeFile nnf;
|
||||
|
||||
public PutImageParams(HttpServletRequest request,
|
||||
HttpServletResponse response, Configuration conf) throws IOException {
|
||||
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
|
||||
storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
|
||||
fileSize = ServletUtil.parseLongParam(request,
|
||||
TransferFsImage.FILE_LENGTH);
|
||||
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
|
||||
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
|
||||
.valueOf(imageType);
|
||||
if (fileSize == 0 || txId == -1 || storageInfoString == null
|
||||
|| storageInfoString.isEmpty()) {
|
||||
throw new IOException("Illegal parameters to TransferFsImage");
|
||||
}
|
||||
}
|
||||
|
||||
public long getTxId() {
|
||||
return txId;
|
||||
}
|
||||
|
||||
public String getStorageInfoString() {
|
||||
return storageInfoString;
|
||||
}
|
||||
|
||||
public long getFileSize() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
public NameNodeFile getNameNodeFile() {
|
||||
return nnf;
|
||||
}
|
||||
}
|
||||
}
|
@ -233,8 +233,8 @@ private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
|
||||
CancelDelegationTokenServlet.class, true);
|
||||
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
|
||||
true);
|
||||
httpServer.addInternalServlet("getimage", "/getimage",
|
||||
GetImageServlet.class, true);
|
||||
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
|
||||
ImageServlet.class, true);
|
||||
httpServer.addInternalServlet("listPaths", "/listPaths/*",
|
||||
ListPathsServlet.class, false);
|
||||
httpServer.addInternalServlet("data", "/data/*",
|
||||
|
@ -114,7 +114,6 @@ public class SecondaryNameNode implements Runnable {
|
||||
private InetSocketAddress nameNodeAddr;
|
||||
private volatile boolean shouldRun;
|
||||
private HttpServer2 infoServer;
|
||||
private URL imageListenURL;
|
||||
|
||||
private Collection<URI> checkpointDirs;
|
||||
private List<URI> checkpointEditsDirs;
|
||||
@ -267,13 +266,11 @@ private void initialize(final Configuration conf,
|
||||
infoServer.setAttribute("secondary.name.node", this);
|
||||
infoServer.setAttribute("name.system.image", checkpointImage);
|
||||
infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
|
||||
infoServer.addInternalServlet("getimage", "/getimage",
|
||||
GetImageServlet.class, true);
|
||||
infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
|
||||
ImageServlet.class, true);
|
||||
infoServer.start();
|
||||
|
||||
LOG.info("Web server init done");
|
||||
imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://"
|
||||
+ NetUtils.getHostPortString(infoServer.getConnectorAddress(0)));
|
||||
|
||||
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
|
||||
int connIdx = 0;
|
||||
@ -487,14 +484,6 @@ private URL getInfoServer() throws IOException {
|
||||
LOG.debug("Will connect to NameNode at " + address);
|
||||
return address.toURL();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the host:port of where this SecondaryNameNode is listening
|
||||
* for image transfers
|
||||
*/
|
||||
private URL getImageListenAddress() {
|
||||
return imageListenURL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new checkpoint
|
||||
@ -555,8 +544,8 @@ public boolean doCheckpoint() throws IOException {
|
||||
// to make this new uploaded image as the most current image.
|
||||
//
|
||||
long txid = checkpointImage.getLastAppliedTxId();
|
||||
TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
|
||||
dstStorage, NameNodeFile.IMAGE, txid);
|
||||
TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
|
||||
NameNodeFile.IMAGE, txid);
|
||||
|
||||
// error simulation code for junit test
|
||||
CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();
|
||||
|
@ -19,18 +19,22 @@
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.security.DigestInputStream;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -49,10 +53,12 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -65,7 +71,12 @@
|
||||
public class TransferFsImage {
|
||||
|
||||
public final static String CONTENT_LENGTH = "Content-Length";
|
||||
public final static String FILE_LENGTH = "File-Length";
|
||||
public final static String MD5_HEADER = "X-MD5-Digest";
|
||||
|
||||
private final static String CONTENT_TYPE = "Content-Type";
|
||||
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
|
||||
|
||||
@VisibleForTesting
|
||||
static int timeout = 0;
|
||||
private static URLConnectionFactory connectionFactory;
|
||||
@ -82,14 +93,14 @@ public class TransferFsImage {
|
||||
|
||||
public static void downloadMostRecentImageToDirectory(URL infoServer,
|
||||
File dir) throws IOException {
|
||||
String fileId = GetImageServlet.getParamStringForMostRecentImage();
|
||||
String fileId = ImageServlet.getParamStringForMostRecentImage();
|
||||
getFileClient(infoServer, fileId, Lists.newArrayList(dir),
|
||||
null, false);
|
||||
}
|
||||
|
||||
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
|
||||
Storage dstStorage, boolean needDigest) throws IOException {
|
||||
String fileid = GetImageServlet.getParamStringForImage(null,
|
||||
String fileid = ImageServlet.getParamStringForImage(null,
|
||||
imageTxId, dstStorage);
|
||||
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
|
||||
|
||||
@ -104,12 +115,31 @@ public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
|
||||
dstFiles.get(0).length() + " bytes.");
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
|
||||
long imageTxId, Storage dstStorage, InputStream stream,
|
||||
long advertisedSize, DataTransferThrottler throttler) throws IOException {
|
||||
|
||||
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
|
||||
|
||||
List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
|
||||
if (dstFiles.isEmpty()) {
|
||||
throw new IOException("No targets in destination storage!");
|
||||
}
|
||||
|
||||
MD5Hash advertisedDigest = parseMD5Header(request);
|
||||
MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
|
||||
advertisedSize, advertisedDigest, fileName, stream, throttler);
|
||||
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
|
||||
+ dstFiles.get(0).length() + " bytes.");
|
||||
return hash;
|
||||
}
|
||||
|
||||
static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
|
||||
NNStorage dstStorage) throws IOException {
|
||||
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
|
||||
"bad log: " + log;
|
||||
String fileid = GetImageServlet.getParamStringForLog(
|
||||
String fileid = ImageServlet.getParamStringForLog(
|
||||
log, dstStorage);
|
||||
String finalFileName = NNStorage.getFinalizedEditsFileName(
|
||||
log.getStartTxId(), log.getEndTxId());
|
||||
@ -159,22 +189,19 @@ static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
|
||||
* Requests that the NameNode download an image from this node.
|
||||
*
|
||||
* @param fsName the http address for the remote NN
|
||||
* @param myNNAddress the host/port where the local node is running an
|
||||
* HTTPServer hosting GetImageServlet
|
||||
* @param conf Configuration
|
||||
* @param storage the storage directory to transfer the image from
|
||||
* @param nnf the NameNodeFile type of the image
|
||||
* @param txid the transaction ID of the image to be uploaded
|
||||
*/
|
||||
public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
|
||||
Storage storage, NameNodeFile nnf, long txid) throws IOException {
|
||||
public static void uploadImageFromStorage(URL fsName, Configuration conf,
|
||||
NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
|
||||
|
||||
String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid,
|
||||
myNNAddress, storage);
|
||||
// this doesn't directly upload an image, but rather asks the NN
|
||||
// to connect back to the 2NN to download the specified image.
|
||||
URL url = new URL(fsName, ImageServlet.PATH_SPEC);
|
||||
long startTime = Time.monotonicNow();
|
||||
try {
|
||||
TransferFsImage.getFileClient(fsName, fileid, null, null, false);
|
||||
} catch (HttpGetFailedException e) {
|
||||
uploadImage(url, conf, storage, nnf, txid);
|
||||
} catch (HttpPutFailedException e) {
|
||||
if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
|
||||
// this is OK - this means that a previous attempt to upload
|
||||
// this checkpoint succeeded even though we thought it failed.
|
||||
@ -186,25 +213,105 @@ public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
LOG.info("Uploaded image with txid " + txid + " to namenode at " +
|
||||
fsName);
|
||||
double xferSec = Math.max(
|
||||
((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
|
||||
LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
|
||||
+ " in " + xferSec + " seconds");
|
||||
}
|
||||
|
||||
/*
|
||||
* Uploads the imagefile using HTTP PUT method
|
||||
*/
|
||||
private static void uploadImage(URL url, Configuration conf,
|
||||
NNStorage storage, NameNodeFile nnf, long txId) throws IOException {
|
||||
|
||||
File imageFile = storage.findImageFile(nnf, txId);
|
||||
if (imageFile == null) {
|
||||
throw new IOException("Could not find image with txid " + txId);
|
||||
}
|
||||
|
||||
HttpURLConnection connection = null;
|
||||
try {
|
||||
URIBuilder uriBuilder = new URIBuilder(url.toURI());
|
||||
|
||||
// write all params for image upload request as query itself.
|
||||
// Request body contains the image to be uploaded.
|
||||
Map<String, String> params = ImageServlet.getParamsForPutImage(storage,
|
||||
txId, imageFile.length(), nnf);
|
||||
for (Entry<String, String> entry : params.entrySet()) {
|
||||
uriBuilder.addParameter(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
URL urlWithParams = uriBuilder.build().toURL();
|
||||
connection = (HttpURLConnection) connectionFactory.openConnection(
|
||||
urlWithParams, UserGroupInformation.isSecurityEnabled());
|
||||
// Set the request to PUT
|
||||
connection.setRequestMethod("PUT");
|
||||
connection.setDoOutput(true);
|
||||
|
||||
|
||||
int chunkSize = conf.getInt(
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT);
|
||||
if (imageFile.length() > chunkSize) {
|
||||
// using chunked streaming mode to support upload of 2GB+ files and to
|
||||
// avoid internal buffering.
|
||||
// this mode should be used only if more than chunkSize data is present
|
||||
// to upload. otherwise upload may not happen sometimes.
|
||||
connection.setChunkedStreamingMode(chunkSize);
|
||||
}
|
||||
|
||||
setTimeout(connection);
|
||||
|
||||
// set headers for verification
|
||||
ImageServlet.setVerificationHeadersForPut(connection, imageFile);
|
||||
|
||||
// Write the file to output stream.
|
||||
writeFileToPutRequest(conf, connection, imageFile);
|
||||
|
||||
int responseCode = connection.getResponseCode();
|
||||
if (responseCode != HttpURLConnection.HTTP_OK) {
|
||||
throw new HttpPutFailedException(connection.getResponseMessage(),
|
||||
responseCode);
|
||||
}
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
connection.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void writeFileToPutRequest(Configuration conf,
|
||||
HttpURLConnection connection, File imageFile)
|
||||
throws FileNotFoundException, IOException {
|
||||
connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
|
||||
connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
|
||||
OutputStream output = connection.getOutputStream();
|
||||
FileInputStream input = new FileInputStream(imageFile);
|
||||
try {
|
||||
copyFileToStream(output, imageFile, input,
|
||||
ImageServlet.getThrottler(conf));
|
||||
} finally {
|
||||
IOUtils.closeStream(input);
|
||||
IOUtils.closeStream(output);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A server-side method to respond to a getfile http request
|
||||
* Copies the contents of the local file into the output stream.
|
||||
*/
|
||||
public static void getFileServer(ServletResponse response, File localfile,
|
||||
FileInputStream infile,
|
||||
DataTransferThrottler throttler)
|
||||
public static void copyFileToStream(OutputStream out, File localfile,
|
||||
FileInputStream infile, DataTransferThrottler throttler)
|
||||
throws IOException {
|
||||
byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
|
||||
ServletOutputStream out = null;
|
||||
try {
|
||||
CheckpointFaultInjector.getInstance()
|
||||
.aboutToSendFile(localfile);
|
||||
out = response.getOutputStream();
|
||||
|
||||
if (CheckpointFaultInjector.getInstance().
|
||||
shouldSendShortFile(localfile)) {
|
||||
@ -250,14 +357,13 @@ public static void getFileServer(ServletResponse response, File localfile,
|
||||
static MD5Hash getFileClient(URL infoServer,
|
||||
String queryString, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum) throws IOException {
|
||||
URL url = new URL(infoServer, "/getimage?" + queryString);
|
||||
URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
|
||||
LOG.info("Opening connection to " + url);
|
||||
return doGetUrl(url, localPaths, dstStorage, getChecksum);
|
||||
}
|
||||
|
||||
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum) throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
HttpURLConnection connection;
|
||||
try {
|
||||
connection = (HttpURLConnection)
|
||||
@ -266,16 +372,7 @@ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
if (timeout <= 0) {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
|
||||
}
|
||||
|
||||
if (timeout > 0) {
|
||||
connection.setConnectTimeout(timeout);
|
||||
connection.setReadTimeout(timeout);
|
||||
}
|
||||
setTimeout(connection);
|
||||
|
||||
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||
throw new HttpGetFailedException(
|
||||
@ -293,10 +390,37 @@ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
throw new IOException(CONTENT_LENGTH + " header is not provided " +
|
||||
"by the namenode when trying to fetch " + url);
|
||||
}
|
||||
|
||||
MD5Hash advertisedDigest = parseMD5Header(connection);
|
||||
String fsImageName = connection
|
||||
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
|
||||
InputStream stream = connection.getInputStream();
|
||||
|
||||
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
|
||||
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
|
||||
null);
|
||||
}
|
||||
|
||||
private static void setTimeout(HttpURLConnection connection) {
|
||||
if (timeout <= 0) {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
|
||||
LOG.info("Image Transfer timeout configured to " + timeout
|
||||
+ " milliseconds");
|
||||
}
|
||||
|
||||
if (timeout > 0) {
|
||||
connection.setConnectTimeout(timeout);
|
||||
connection.setReadTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
private static MD5Hash receiveFile(String url, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum, long advertisedSize,
|
||||
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
|
||||
DataTransferThrottler throttler) throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
if (localPaths != null) {
|
||||
String fsImageName = connection.getHeaderField(
|
||||
GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
|
||||
// If the local paths refer to directories, use the server-provided header
|
||||
// as the filename within that directory
|
||||
List<File> newLocalPaths = new ArrayList<File>();
|
||||
@ -313,10 +437,8 @@ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
localPaths = newLocalPaths;
|
||||
}
|
||||
|
||||
MD5Hash advertisedDigest = parseMD5Header(connection);
|
||||
|
||||
long received = 0;
|
||||
InputStream stream = connection.getInputStream();
|
||||
MessageDigest digester = null;
|
||||
if (getChecksum) {
|
||||
digester = MD5Hash.getDigester();
|
||||
@ -361,6 +483,9 @@ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
for (FileOutputStream fos : outputStreams) {
|
||||
fos.write(buf, 0, num);
|
||||
}
|
||||
if (throttler != null) {
|
||||
throttler.throttle(num);
|
||||
}
|
||||
}
|
||||
}
|
||||
finishedReceiving = true;
|
||||
@ -404,7 +529,12 @@ private static MD5Hash parseMD5Header(HttpURLConnection connection) {
|
||||
String header = connection.getHeaderField(MD5_HEADER);
|
||||
return (header != null) ? new MD5Hash(header) : null;
|
||||
}
|
||||
|
||||
|
||||
private static MD5Hash parseMD5Header(HttpServletRequest request) {
|
||||
String header = request.getHeader(MD5_HEADER);
|
||||
return (header != null) ? new MD5Hash(header) : null;
|
||||
}
|
||||
|
||||
public static class HttpGetFailedException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
private final int responseCode;
|
||||
@ -419,4 +549,18 @@ public int getResponseCode() {
|
||||
}
|
||||
}
|
||||
|
||||
public static class HttpPutFailedException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
private final int responseCode;
|
||||
|
||||
HttpPutFailedException(String msg, int responseCode) throws IOException {
|
||||
super(msg);
|
||||
this.responseCode = responseCode;
|
||||
}
|
||||
|
||||
public int getResponseCode() {
|
||||
return responseCode;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -63,6 +63,7 @@ public class StandbyCheckpointer {
|
||||
private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
|
||||
private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
|
||||
private final CheckpointConf checkpointConf;
|
||||
private final Configuration conf;
|
||||
private final FSNamesystem namesystem;
|
||||
private long lastCheckpointTime;
|
||||
private final CheckpointerThread thread;
|
||||
@ -80,6 +81,7 @@ public class StandbyCheckpointer {
|
||||
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
|
||||
throws IOException {
|
||||
this.namesystem = ns;
|
||||
this.conf = conf;
|
||||
this.checkpointConf = new CheckpointConf(conf);
|
||||
this.thread = new CheckpointerThread();
|
||||
this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
|
||||
@ -193,7 +195,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
|
||||
Future<Void> upload = executor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
|
||||
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
|
||||
namesystem.getFSImage().getStorage(), imageType, txid);
|
||||
return null;
|
||||
}
|
||||
|
@ -858,15 +858,13 @@
|
||||
|
||||
<property>
|
||||
<name>dfs.image.transfer.timeout</name>
|
||||
<value>600000</value>
|
||||
<value>60000</value>
|
||||
<description>
|
||||
Timeout for image transfer in milliseconds. This timeout and the related
|
||||
Socket timeout for image transfer in milliseconds. This timeout and the related
|
||||
dfs.image.transfer.bandwidthPerSec parameter should be configured such
|
||||
that normal image transfer can complete within the timeout.
|
||||
that normal image transfer can complete successfully.
|
||||
This timeout prevents client hangs when the sender fails during
|
||||
image transfer, which is particularly important during checkpointing.
|
||||
Note that this timeout applies to the entirety of image transfer, and
|
||||
is not a socket timeout.
|
||||
image transfer. This is socket timeout during image tranfer.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
@ -883,6 +881,16 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.image.transfer.chunksize</name>
|
||||
<value>65536</value>
|
||||
<description>
|
||||
Chunksize in bytes to upload the checkpoint.
|
||||
Chunked streaming is used to avoid internal buffering of contents
|
||||
of image file of huge size.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.support.allow.format</name>
|
||||
<value>true</value>
|
||||
|
@ -31,6 +31,7 @@
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
@ -554,23 +555,9 @@ public void testSecondaryNamenodeError3() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulate a secondary node failure to transfer image
|
||||
* back to the name-node.
|
||||
* Used to truncate primary fsimage file.
|
||||
*/
|
||||
@Test
|
||||
public void testSecondaryFailsToReturnImage() throws IOException {
|
||||
Mockito.doThrow(new IOException("If this exception is not caught by the " +
|
||||
"name-node, fs image will be truncated."))
|
||||
.when(faultInjector).aboutToSendFile(filePathContaining("secondary"));
|
||||
|
||||
doSecondaryFailsToReturnImage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar to above test, but uses an unchecked Error, and causes it
|
||||
* before even setting the length header. This used to cause image
|
||||
* truncation. Regression test for HDFS-3330.
|
||||
* Simulate a secondary node failure to transfer image. Uses an unchecked
|
||||
* error and fail transfer before even setting the length header. This used to
|
||||
* cause image truncation. Regression test for HDFS-3330.
|
||||
*/
|
||||
@Test
|
||||
public void testSecondaryFailsWithErrorBeforeSettingHeaders()
|
||||
@ -1975,7 +1962,14 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException {
|
||||
Mockito.doReturn(Lists.newArrayList(new File("/wont-be-written")))
|
||||
.when(dstImage).getFiles(
|
||||
Mockito.<NameNodeDirType>anyObject(), Mockito.anyString());
|
||||
|
||||
|
||||
File mockImageFile = File.createTempFile("image", "");
|
||||
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
|
||||
imageFile.write("data".getBytes());
|
||||
imageFile.close();
|
||||
Mockito.doReturn(mockImageFile).when(dstImage)
|
||||
.findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong());
|
||||
|
||||
Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString())
|
||||
.when(dstImage).toColonSeparatedString();
|
||||
|
||||
@ -1996,8 +1990,8 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException {
|
||||
}
|
||||
|
||||
try {
|
||||
TransferFsImage.uploadImageFromStorage(fsName, new URL(
|
||||
"http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0);
|
||||
TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage,
|
||||
NameNodeFile.IMAGE, 0);
|
||||
fail("Storage info was not verified");
|
||||
} catch (IOException ioe) {
|
||||
String msg = StringUtils.stringifyException(ioe);
|
||||
|
@ -69,7 +69,7 @@ public void testIsValidRequestor() throws IOException {
|
||||
Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls);
|
||||
|
||||
// Make sure that NN2 is considered a valid fsimage/edits requestor.
|
||||
assertTrue(GetImageServlet.isValidRequestor(context,
|
||||
assertTrue(ImageServlet.isValidRequestor(context,
|
||||
"hdfs/host2@TEST-REALM.COM", conf));
|
||||
|
||||
// Mark atm as an admin.
|
||||
@ -81,15 +81,15 @@ public boolean matches(Object argument) {
|
||||
}))).thenReturn(true);
|
||||
|
||||
// Make sure that NN2 is still considered a valid requestor.
|
||||
assertTrue(GetImageServlet.isValidRequestor(context,
|
||||
assertTrue(ImageServlet.isValidRequestor(context,
|
||||
"hdfs/host2@TEST-REALM.COM", conf));
|
||||
|
||||
// Make sure an admin is considered a valid requestor.
|
||||
assertTrue(GetImageServlet.isValidRequestor(context,
|
||||
assertTrue(ImageServlet.isValidRequestor(context,
|
||||
"atm@TEST-REALM.COM", conf));
|
||||
|
||||
// Make sure other users are *not* considered valid requestors.
|
||||
assertFalse(GetImageServlet.isValidRequestor(context,
|
||||
assertFalse(ImageServlet.isValidRequestor(context,
|
||||
"todd@TEST-REALM.COM", conf));
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URL;
|
||||
@ -34,9 +35,11 @@
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
import org.apache.hadoop.http.HttpServerFunctionalTest;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
@ -118,10 +121,11 @@ public void testClientSideExceptionOnJustOneDir() throws IOException {
|
||||
* Test to verify the read timeout
|
||||
*/
|
||||
@Test(timeout = 5000)
|
||||
public void testImageTransferTimeout() throws Exception {
|
||||
public void testGetImageTimeout() throws Exception {
|
||||
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
|
||||
try {
|
||||
testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class);
|
||||
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
|
||||
TestImageTransferServlet.class);
|
||||
testServer.start();
|
||||
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
|
||||
TransferFsImage.timeout = 2000;
|
||||
@ -139,7 +143,48 @@ public void testImageTransferTimeout() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestGetImageServlet extends HttpServlet {
|
||||
/**
|
||||
* Test to verify the timeout of Image upload
|
||||
*/
|
||||
@Test(timeout = 10000)
|
||||
public void testImageUploadTimeout() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
NNStorage mockStorage = Mockito.mock(NNStorage.class);
|
||||
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
|
||||
try {
|
||||
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
|
||||
TestImageTransferServlet.class);
|
||||
testServer.start();
|
||||
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
|
||||
// set the timeout here, otherwise it will take default.
|
||||
TransferFsImage.timeout = 2000;
|
||||
|
||||
File tmpDir = new File(new FileSystemTestHelper().getTestRootDir());
|
||||
tmpDir.mkdirs();
|
||||
|
||||
File mockImageFile = File.createTempFile("image", "", tmpDir);
|
||||
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
|
||||
imageFile.write("data".getBytes());
|
||||
imageFile.close();
|
||||
Mockito.when(
|
||||
mockStorage.findImageFile(Mockito.any(NameNodeFile.class),
|
||||
Mockito.anyLong())).thenReturn(mockImageFile);
|
||||
Mockito.when(mockStorage.toColonSeparatedString()).thenReturn(
|
||||
"storage:info:string");
|
||||
|
||||
try {
|
||||
TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage,
|
||||
NameNodeFile.IMAGE, 1L);
|
||||
fail("TransferImage Should fail with timeout");
|
||||
} catch (SocketTimeoutException e) {
|
||||
assertEquals("Upload should timeout", "Read timed out", e.getMessage());
|
||||
}
|
||||
} finally {
|
||||
testServer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestImageTransferServlet extends HttpServlet {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
@ -153,5 +198,17 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPut(HttpServletRequest req, HttpServletResponse resp)
|
||||
throws ServletException, IOException {
|
||||
synchronized (this) {
|
||||
try {
|
||||
wait(5000);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user