HDFS-5978. Create a tool to take fsimage and expose read-only WebHDFS API. Contributed by Akira Ajisaka.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1582433 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
51e353c091
commit
54c1daa580
@ -266,7 +266,10 @@ Release 2.5.0 - UNRELEASED
|
||||
|
||||
HDFS-6119. FSNamesystem code cleanup. (suresh)
|
||||
|
||||
HDFS-6158. Clean up dead code for OfflineImageViewer (wheat9)
|
||||
HDFS-6158. Clean up dead code for OfflineImageViewer. (wheat9)
|
||||
|
||||
HDFS-5978. Create a tool to take fsimage and expose read-only WebHDFS API.
|
||||
(Akira Ajisaka via wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
@ -179,7 +179,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
<scope>test</scope>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
@ -0,0 +1,95 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
|
||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpRequest;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.jboss.netty.handler.codec.http.HttpVersion;
|
||||
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
|
||||
|
||||
/**
|
||||
* Implement the read-only WebHDFS API for fsimage.
|
||||
*/
|
||||
public class FSImageHandler extends SimpleChannelUpstreamHandler {
|
||||
public static final Log LOG = LogFactory.getLog(FSImageHandler.class);
|
||||
private final FSImageLoader loader;
|
||||
|
||||
public FSImageHandler(FSImageLoader loader) throws IOException {
|
||||
this.loader = loader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(
|
||||
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
HttpRequest request = (HttpRequest) e.getMessage();
|
||||
if (request.getMethod() == HttpMethod.GET){
|
||||
String uri = request.getUri();
|
||||
QueryStringDecoder decoder = new QueryStringDecoder(uri);
|
||||
|
||||
String op = "null";
|
||||
if (decoder.getParameters().containsKey("op")) {
|
||||
op = decoder.getParameters().get("op").get(0).toUpperCase();
|
||||
}
|
||||
HttpResponse response = new DefaultHttpResponse(
|
||||
HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
|
||||
String json = null;
|
||||
|
||||
if (op.equals("LISTSTATUS")) {
|
||||
try {
|
||||
json = loader.listStatus(decoder.getPath());
|
||||
response.setStatus(HttpResponseStatus.OK);
|
||||
response.setHeader(HttpHeaders.Names.CONTENT_TYPE,
|
||||
"application/json");
|
||||
HttpHeaders.setContentLength(response, json.length());
|
||||
} catch (Exception ex) {
|
||||
LOG.warn(ex.getMessage());
|
||||
response.setStatus(HttpResponseStatus.NOT_FOUND);
|
||||
}
|
||||
} else {
|
||||
response.setStatus(HttpResponseStatus.BAD_REQUEST);
|
||||
}
|
||||
|
||||
e.getChannel().write(response);
|
||||
if (json != null) {
|
||||
e.getChannel().write(json);
|
||||
}
|
||||
LOG.info(response.getStatus().getCode() + " method=GET op=" + op
|
||||
+ " target=" + decoder.getPath());
|
||||
} else {
|
||||
// only HTTP GET is allowed since fsimage is read-only.
|
||||
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
|
||||
HttpResponseStatus.METHOD_NOT_ALLOWED);
|
||||
e.getChannel().write(response);
|
||||
LOG.info(response.getStatus().getCode() + " method="
|
||||
+ request.getMethod().getName());
|
||||
}
|
||||
e.getFuture().addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
}
|
@ -0,0 +1,369 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.LimitInputStream;
|
||||
|
||||
/**
|
||||
* FSImageLoader loads fsimage and provide methods to return JSON formatted
|
||||
* file status of the namespace of the fsimage.
|
||||
*/
|
||||
public class FSImageLoader {
|
||||
public static final Log LOG = LogFactory.getLog(FSImageHandler.class);
|
||||
|
||||
private static String[] stringTable;
|
||||
private static Map<Long, FsImageProto.INodeSection.INode> inodes =
|
||||
Maps.newHashMap();
|
||||
private static Map<Long, long[]> dirmap = Maps.newHashMap();
|
||||
private static List<FsImageProto.INodeReferenceSection.INodeReference>
|
||||
refList = Lists.newArrayList();
|
||||
|
||||
private FSImageLoader() {}
|
||||
|
||||
/**
|
||||
* Load fsimage into the memory.
|
||||
* @param inputFile the filepath of the fsimage to load.
|
||||
* @return FSImageLoader
|
||||
* @throws IOException if failed to load fsimage.
|
||||
*/
|
||||
public static FSImageLoader load(String inputFile) throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
RandomAccessFile file = new RandomAccessFile(inputFile, "r");
|
||||
if (!FSImageUtil.checkFileFormat(file)) {
|
||||
throw new IOException("Unrecognized FSImage");
|
||||
}
|
||||
|
||||
FsImageProto.FileSummary summary = FSImageUtil.loadSummary(file);
|
||||
FileInputStream fin = null;
|
||||
try {
|
||||
fin = new FileInputStream(file.getFD());
|
||||
|
||||
ArrayList<FsImageProto.FileSummary.Section> sections =
|
||||
Lists.newArrayList(summary.getSectionsList());
|
||||
Collections.sort(sections,
|
||||
new Comparator<FsImageProto.FileSummary.Section>() {
|
||||
@Override
|
||||
public int compare(FsImageProto.FileSummary.Section s1,
|
||||
FsImageProto.FileSummary.Section s2) {
|
||||
FSImageFormatProtobuf.SectionName n1 =
|
||||
FSImageFormatProtobuf.SectionName.fromString(s1.getName());
|
||||
FSImageFormatProtobuf.SectionName n2 =
|
||||
FSImageFormatProtobuf.SectionName.fromString(s2.getName());
|
||||
if (n1 == null) {
|
||||
return n2 == null ? 0 : -1;
|
||||
} else if (n2 == null) {
|
||||
return -1;
|
||||
} else {
|
||||
return n1.ordinal() - n2.ordinal();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (FsImageProto.FileSummary.Section s : sections) {
|
||||
fin.getChannel().position(s.getOffset());
|
||||
InputStream is = FSImageUtil.wrapInputStreamForCompression(conf,
|
||||
summary.getCodec(), new BufferedInputStream(new LimitInputStream(
|
||||
fin, s.getLength())));
|
||||
|
||||
switch (FSImageFormatProtobuf.SectionName.fromString(s.getName())) {
|
||||
case STRING_TABLE:
|
||||
loadStringTable(is);
|
||||
break;
|
||||
case INODE:
|
||||
loadINodeSection(is);
|
||||
break;
|
||||
case INODE_REFERENCE:
|
||||
loadINodeReferenceSection(is);
|
||||
break;
|
||||
case INODE_DIR:
|
||||
loadINodeDirectorySection(is);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
IOUtils.cleanup(null, fin);
|
||||
}
|
||||
return new FSImageLoader();
|
||||
}
|
||||
|
||||
private static void loadINodeDirectorySection(InputStream in)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Loading directory section");
|
||||
}
|
||||
while (true) {
|
||||
FsImageProto.INodeDirectorySection.DirEntry e =
|
||||
FsImageProto.INodeDirectorySection.DirEntry.parseDelimitedFrom(in);
|
||||
// note that in is a LimitedInputStream
|
||||
if (e == null) {
|
||||
break;
|
||||
}
|
||||
long[] l = new long[e.getChildrenCount() + e.getRefChildrenCount()];
|
||||
for (int i = 0; i < e.getChildrenCount(); ++i) {
|
||||
l[i] = e.getChildren(i);
|
||||
}
|
||||
for (int i = e.getChildrenCount(); i < l.length; i++) {
|
||||
int refId = e.getRefChildren(i - e.getChildrenCount());
|
||||
l[i] = refList.get(refId).getReferredId();
|
||||
}
|
||||
dirmap.put(e.getParent(), l);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Loaded directory (parent " + e.getParent()
|
||||
+ ") with " + e.getChildrenCount() + " children and "
|
||||
+ e.getRefChildrenCount() + " reference children");
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Loaded " + dirmap.size() + " directories");
|
||||
}
|
||||
}
|
||||
|
||||
private static void loadINodeReferenceSection(InputStream in)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Loading inode reference section");
|
||||
}
|
||||
while (true) {
|
||||
FsImageProto.INodeReferenceSection.INodeReference e =
|
||||
FsImageProto.INodeReferenceSection.INodeReference
|
||||
.parseDelimitedFrom(in);
|
||||
if (e == null) {
|
||||
break;
|
||||
}
|
||||
refList.add(e);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Loaded inode reference named '" + e.getName()
|
||||
+ "' referring to id " + e.getReferredId() + "");
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Loaded " + refList.size() + " inode references");
|
||||
}
|
||||
}
|
||||
|
||||
private static void loadINodeSection(InputStream in) throws IOException {
|
||||
FsImageProto.INodeSection s = FsImageProto.INodeSection
|
||||
.parseDelimitedFrom(in);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found " + s.getNumInodes() + " inodes in inode section");
|
||||
}
|
||||
for (int i = 0; i < s.getNumInodes(); ++i) {
|
||||
FsImageProto.INodeSection.INode p = FsImageProto.INodeSection.INode
|
||||
.parseDelimitedFrom(in);
|
||||
inodes.put(p.getId(), p);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Loaded inode id " + p.getId() + " type " + p.getType()
|
||||
+ " name '" + p.getName().toStringUtf8() + "'");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void loadStringTable(InputStream in) throws IOException {
|
||||
FsImageProto.StringTableSection s = FsImageProto.StringTableSection
|
||||
.parseDelimitedFrom(in);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found " + s.getNumEntry() + " strings in string section");
|
||||
}
|
||||
stringTable = new String[s.getNumEntry() + 1];
|
||||
for (int i = 0; i < s.getNumEntry(); ++i) {
|
||||
FsImageProto.StringTableSection.Entry e = FsImageProto
|
||||
.StringTableSection.Entry.parseDelimitedFrom(in);
|
||||
stringTable[e.getId()] = e.getStr();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Loaded string " + e.getStr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the JSON formatted list of the files in the specified directory.
|
||||
* @param path a path specifies a directory to list
|
||||
* @return JSON formatted file list in the directory
|
||||
* @throws IOException if failed to serialize fileStatus to JSON.
|
||||
*/
|
||||
public String listStatus(String path) throws IOException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
List<Map<String, Object>> fileStatusList = getFileStatusList(path);
|
||||
sb.append("{\"FileStatuses\":{\"FileStatus\":[\n");
|
||||
int i = 0;
|
||||
for (Map<String, Object> fileStatusMap : fileStatusList) {
|
||||
if (i++ != 0) {
|
||||
sb.append(',');
|
||||
}
|
||||
sb.append(mapper.writeValueAsString(fileStatusMap));
|
||||
}
|
||||
sb.append("\n]}}\n");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private List<Map<String, Object>> getFileStatusList(String path) {
|
||||
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
|
||||
long id = getINodeId(path);
|
||||
FsImageProto.INodeSection.INode inode = inodes.get(id);
|
||||
if (inode.getType() == FsImageProto.INodeSection.INode.Type.DIRECTORY) {
|
||||
long[] children = dirmap.get(id);
|
||||
for (long cid : children) {
|
||||
list.add(getFileStatus(inodes.get(cid), true));
|
||||
}
|
||||
} else {
|
||||
list.add(getFileStatus(inode, false));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the INodeId of the specified path.
|
||||
*/
|
||||
private long getINodeId(String strPath) {
|
||||
if (strPath.equals("/")) {
|
||||
return INodeId.ROOT_INODE_ID;
|
||||
}
|
||||
|
||||
String[] nameList = strPath.split("/");
|
||||
Preconditions.checkArgument(nameList.length > 1,
|
||||
"Illegal path: " + strPath);
|
||||
long id = INodeId.ROOT_INODE_ID;
|
||||
for (int i = 1; i < nameList.length; i++) {
|
||||
long[] children = dirmap.get(id);
|
||||
Preconditions.checkNotNull(children, "The specified path: " +
|
||||
strPath + " is not found in the fsimage.");
|
||||
String cName = nameList[i];
|
||||
boolean findChildren = false;
|
||||
for (long cid : children) {
|
||||
if (cName.equals(inodes.get(cid).getName().toStringUtf8())) {
|
||||
id = cid;
|
||||
findChildren = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Preconditions.checkArgument(findChildren, "The specified path: " +
|
||||
strPath + " is not found in the fsimage.");
|
||||
}
|
||||
return id;
|
||||
}
|
||||
|
||||
private Map<String, Object> getFileStatus
|
||||
(FsImageProto.INodeSection.INode inode, boolean printSuffix){
|
||||
Map<String, Object> map = Maps.newHashMap();
|
||||
switch (inode.getType()) {
|
||||
case FILE: {
|
||||
FsImageProto.INodeSection.INodeFile f = inode.getFile();
|
||||
PermissionStatus p = FSImageFormatPBINode.Loader.loadPermission(
|
||||
f.getPermission(), stringTable);
|
||||
map.put("accessTime", f.getAccessTime());
|
||||
map.put("blockSize", f.getPreferredBlockSize());
|
||||
map.put("group", p.getGroupName());
|
||||
map.put("length", getFileSize(f));
|
||||
map.put("modificationTime", f.getModificationTime());
|
||||
map.put("owner", p.getUserName());
|
||||
map.put("pathSuffix",
|
||||
printSuffix ? inode.getName().toStringUtf8() : "");
|
||||
map.put("permission", toString(p.getPermission()));
|
||||
map.put("replication", f.getReplication());
|
||||
map.put("type", inode.getType());
|
||||
map.put("fileId", inode.getId());
|
||||
map.put("childrenNum", 0);
|
||||
return map;
|
||||
}
|
||||
case DIRECTORY: {
|
||||
FsImageProto.INodeSection.INodeDirectory d = inode.getDirectory();
|
||||
PermissionStatus p = FSImageFormatPBINode.Loader.loadPermission(
|
||||
d.getPermission(), stringTable);
|
||||
map.put("accessTime", 0);
|
||||
map.put("blockSize", 0);
|
||||
map.put("group", p.getGroupName());
|
||||
map.put("length", 0);
|
||||
map.put("modificationTime", d.getModificationTime());
|
||||
map.put("owner", p.getUserName());
|
||||
map.put("pathSuffix",
|
||||
printSuffix ? inode.getName().toStringUtf8() : "");
|
||||
map.put("permission", toString(p.getPermission()));
|
||||
map.put("replication", 0);
|
||||
map.put("type", inode.getType());
|
||||
map.put("fileId", inode.getId());
|
||||
map.put("childrenNum", dirmap.get(inode.getId()).length);
|
||||
return map;
|
||||
}
|
||||
case SYMLINK: {
|
||||
FsImageProto.INodeSection.INodeSymlink d = inode.getSymlink();
|
||||
PermissionStatus p = FSImageFormatPBINode.Loader.loadPermission(
|
||||
d.getPermission(), stringTable);
|
||||
map.put("accessTime", d.getAccessTime());
|
||||
map.put("blockSize", 0);
|
||||
map.put("group", p.getGroupName());
|
||||
map.put("length", 0);
|
||||
map.put("modificationTime", d.getModificationTime());
|
||||
map.put("owner", p.getUserName());
|
||||
map.put("pathSuffix",
|
||||
printSuffix ? inode.getName().toStringUtf8() : "");
|
||||
map.put("permission", toString(p.getPermission()));
|
||||
map.put("replication", 0);
|
||||
map.put("type", inode.getType());
|
||||
map.put("symlink", d.getTarget().toStringUtf8());
|
||||
map.put("fileId", inode.getId());
|
||||
map.put("childrenNum", 0);
|
||||
return map;
|
||||
}
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private long getFileSize(FsImageProto.INodeSection.INodeFile f) {
|
||||
long size = 0;
|
||||
for (HdfsProtos.BlockProto p : f.getBlocksList()) {
|
||||
size += p.getNumBytes();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
private String toString(FsPermission permission) {
|
||||
return String.format("%o", permission.toShort());
|
||||
}
|
||||
}
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
/**
|
||||
* OfflineImageViewerPB to dump the contents of an Hadoop image file to XML or
|
||||
@ -69,6 +70,8 @@ public class OfflineImageViewerPB {
|
||||
+ " -maxSize specifies the range [0, maxSize] of file sizes to be\n"
|
||||
+ " analyzed (128GB by default).\n"
|
||||
+ " -step defines the granularity of the distribution. (2MB by default)\n"
|
||||
+ " * Web: Run a viewer to expose read-only WebHDFS API.\n"
|
||||
+ " -addr specifies the address to listen. (localhost:5978 by default)\n"
|
||||
+ "\n"
|
||||
+ "Required command line arguments:\n"
|
||||
+ "-i,--inputFile <arg> FSImage file to process.\n"
|
||||
@ -103,6 +106,7 @@ private static Options buildOptions() {
|
||||
options.addOption("h", "help", false, "");
|
||||
options.addOption("maxSize", true, "");
|
||||
options.addOption("step", true, "");
|
||||
options.addOption("addr", true, "");
|
||||
|
||||
return options;
|
||||
}
|
||||
@ -161,6 +165,10 @@ public static int run(String[] args) throws IOException {
|
||||
} else if (processor.equals("XML")) {
|
||||
new PBImageXmlWriter(conf, out).visit(new RandomAccessFile(inputFile,
|
||||
"r"));
|
||||
} else if (processor.equals("Web")) {
|
||||
String addr = cmd.getOptionValue("addr", "localhost:5978");
|
||||
new WebImageViewer(NetUtils.createSocketAddr(addr))
|
||||
.initServerAndWait(inputFile);
|
||||
} else {
|
||||
new LsrPBImage(conf, out).visit(new RandomAccessFile(inputFile, "r"));
|
||||
}
|
||||
|
@ -0,0 +1,126 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.channel.group.ChannelGroup;
|
||||
import org.jboss.netty.channel.group.DefaultChannelGroup;
|
||||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
|
||||
import org.jboss.netty.handler.codec.string.StringEncoder;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* WebImageViewer loads a fsimage and exposes read-only WebHDFS API for its
|
||||
* namespace.
|
||||
*/
|
||||
public class WebImageViewer {
|
||||
public static final Log LOG = LogFactory.getLog(WebImageViewer.class);
|
||||
|
||||
private Channel channel;
|
||||
private InetSocketAddress address;
|
||||
private final ChannelFactory factory =
|
||||
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
|
||||
Executors.newCachedThreadPool(), 1);
|
||||
private final ServerBootstrap bootstrap = new ServerBootstrap(factory);
|
||||
|
||||
static final ChannelGroup allChannels =
|
||||
new DefaultChannelGroup("WebImageViewer");
|
||||
|
||||
public WebImageViewer(InetSocketAddress address) {
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start WebImageViewer and wait until the thread is interrupted.
|
||||
* @param fsimage the fsimage to load.
|
||||
* @throws IOException if failed to load the fsimage.
|
||||
*/
|
||||
public void initServerAndWait(String fsimage) throws IOException {
|
||||
initServer(fsimage);
|
||||
try {
|
||||
channel.getCloseFuture().await();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted. Stopping the WebImageViewer.");
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start WebImageViewer.
|
||||
* @param fsimage the fsimage to load.
|
||||
* @throws IOException if fail to load the fsimage.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void initServer(String fsimage) throws IOException {
|
||||
FSImageLoader loader = FSImageLoader.load(fsimage);
|
||||
|
||||
ChannelPipeline pipeline = Channels.pipeline();
|
||||
pipeline.addLast("channelTracker", new SimpleChannelUpstreamHandler() {
|
||||
@Override
|
||||
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||
throws Exception {
|
||||
allChannels.add(e.getChannel());
|
||||
}
|
||||
});
|
||||
pipeline.addLast("httpDecoder", new HttpRequestDecoder());
|
||||
pipeline.addLast("requestHandler", new FSImageHandler(loader));
|
||||
pipeline.addLast("stringEncoder", new StringEncoder());
|
||||
pipeline.addLast("httpEncoder", new HttpResponseEncoder());
|
||||
bootstrap.setPipeline(pipeline);
|
||||
channel = bootstrap.bind(address);
|
||||
allChannels.add(channel);
|
||||
|
||||
address = (InetSocketAddress) channel.getLocalAddress();
|
||||
LOG.info("WebImageViewer started. Listening on " + address.toString()
|
||||
+ ". Press Ctrl+C to stop the viewer.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop WebImageViewer.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void shutdown() {
|
||||
allChannels.close().awaitUninterruptibly();
|
||||
factory.releaseExternalResources();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the listening port.
|
||||
* @return the port WebImageViewer is listening on
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public int getPort() {
|
||||
return address.getPort();
|
||||
}
|
||||
}
|
@ -28,9 +28,13 @@
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@ -52,8 +56,12 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.type.TypeReference;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -289,4 +297,66 @@ public void testPBImageXmlWriter() throws IOException, SAXException,
|
||||
final String xml = output.getBuffer().toString();
|
||||
parser.parse(new InputSource(new StringReader(xml)), new DefaultHandler());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWebImageViewer() throws IOException, InterruptedException {
|
||||
WebImageViewer viewer = new WebImageViewer(
|
||||
NetUtils.createSocketAddr("localhost:0"));
|
||||
try {
|
||||
viewer.initServer(originalFsimage.getAbsolutePath());
|
||||
int port = viewer.getPort();
|
||||
|
||||
// 1. LISTSTATUS operation to a valid path
|
||||
URL url = new URL("http://localhost:" + port + "/?op=LISTSTATUS");
|
||||
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
||||
connection.setRequestMethod("GET");
|
||||
connection.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_OK, connection.getResponseCode());
|
||||
assertEquals("application/json", connection.getContentType());
|
||||
|
||||
String content = org.apache.commons.io.IOUtils.toString(
|
||||
connection.getInputStream());
|
||||
LOG.info("content: " + content);
|
||||
|
||||
// verify the number of directories listed
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
Map<String, Map<String, List<Map<String, Object>>>> fileStatuses =
|
||||
mapper.readValue(content, new TypeReference
|
||||
<Map<String, Map<String, List<Map<String, Object>>>>>(){});
|
||||
List<Map<String, Object>> fileStatusList = fileStatuses
|
||||
.get("FileStatuses").get("FileStatus");
|
||||
assertEquals(NUM_DIRS, fileStatusList.size());
|
||||
|
||||
// verify the number of files in a directory
|
||||
Map<String, Object> fileStatusMap = fileStatusList.get(0);
|
||||
assertEquals(FILES_PER_DIR, fileStatusMap.get("childrenNum"));
|
||||
|
||||
// 2. LISTSTATUS operation to a invalid path
|
||||
url = new URL("http://localhost:" + port + "/invalid/?op=LISTSTATUS");
|
||||
connection = (HttpURLConnection) url.openConnection();
|
||||
connection.setRequestMethod("GET");
|
||||
connection.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_NOT_FOUND,
|
||||
connection.getResponseCode());
|
||||
|
||||
// 3. invalid operation
|
||||
url = new URL("http://localhost:" + port + "/?op=INVALID");
|
||||
connection = (HttpURLConnection) url.openConnection();
|
||||
connection.setRequestMethod("GET");
|
||||
connection.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_BAD_REQUEST,
|
||||
connection.getResponseCode());
|
||||
|
||||
// 4. invalid method
|
||||
url = new URL("http://localhost:" + port + "/?op=LISTSTATUS");
|
||||
connection = (HttpURLConnection) url.openConnection();
|
||||
connection.setRequestMethod("POST");
|
||||
connection.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_BAD_METHOD,
|
||||
connection.getResponseCode());
|
||||
} finally {
|
||||
// shutdown the viewer
|
||||
viewer.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user