HDFS-7280. Use netty 4 in WebImageViewer. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-10-28 16:53:53 -07:00
parent ac9ab037e9
commit 675bca2968
8 changed files with 155 additions and 132 deletions

View File

@ -318,6 +318,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7213. processIncrementalBlockReport performance degradation.
(Eric Payne via kihwal)
HDFS-7280. Use netty 4 in WebImageViewer. (wheat9)
OPTIMIZATIONS
BUG FIXES

View File

@ -173,7 +173,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
<dependency>

View File

@ -22,105 +22,118 @@
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
import javax.management.Query;
/**
* Implement the read-only WebHDFS API for fsimage.
*/
class FSImageHandler extends SimpleChannelUpstreamHandler {
class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
public static final Log LOG = LogFactory.getLog(FSImageHandler.class);
private final FSImageLoader image;
private final ChannelGroup activeChannels;
FSImageHandler(FSImageLoader image) throws IOException {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
activeChannels.add(ctx.channel());
}
FSImageHandler(FSImageLoader image, ChannelGroup activeChannels) throws IOException {
this.image = image;
this.activeChannels = activeChannels;
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelFuture future = e.getFuture();
try {
future = handleOperation(e);
} finally {
future.addListener(ChannelFutureListener.CLOSE);
}
}
private ChannelFuture handleOperation(MessageEvent e)
throws IOException {
HttpRequest request = (HttpRequest) e.getMessage();
HttpResponse response = new DefaultHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json");
public void channelRead0(ChannelHandlerContext ctx, HttpRequest request)
throws Exception {
if (request.getMethod() != HttpMethod.GET) {
response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
return e.getChannel().write(response);
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
METHOD_NOT_ALLOWED);
resp.headers().set("Connection", "close");
ctx.write(resp).addListener(ChannelFutureListener.CLOSE);
return;
}
QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
final String op = getOp(decoder);
String content;
String path = null;
try {
path = getPath(decoder);
if ("GETFILESTATUS".equals(op)) {
content = image.getFileStatus(path);
} else if ("LISTSTATUS".equals(op)) {
content = image.listStatus(path);
} else if ("GETACLSTATUS".equals(op)) {
content = image.getAclStatus(path);
} else {
throw new IllegalArgumentException("Invalid value for webhdfs parameter" + " \"op\"");
}
} catch (IllegalArgumentException ex) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
content = JsonUtil.toJsonString(ex);
} catch (FileNotFoundException ex) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
content = JsonUtil.toJsonString(ex);
} catch (Exception ex) {
content = JsonUtil.toJsonString(ex);
final String content;
String path = getPath(decoder);
if ("GETFILESTATUS".equals(op)) {
content = image.getFileStatus(path);
} else if ("LISTSTATUS".equals(op)) {
content = image.listStatus(path);
} else if ("GETACLSTATUS".equals(op)) {
content = image.getAclStatus(path);
} else {
throw new IllegalArgumentException("Invalid value for webhdfs parameter" + " \"op\"");
}
HttpHeaders.setContentLength(response, content.length());
e.getChannel().write(response);
ChannelFuture future = e.getChannel().write(content);
LOG.info("op=" + op + " target=" + path);
LOG.info(response.getStatus().getCode() + " method="
+ request.getMethod().getName() + " op=" + op + " target=" + path);
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(
HTTP_1_1, HttpResponseStatus.OK,
Unpooled.wrappedBuffer(content.getBytes()));
resp.headers().set("Content-Type", "application/json");
resp.headers().set("Content-Length", resp.content().readableBytes());
resp.headers().set("Connection", "close");
ctx.write(resp).addListener(ChannelFutureListener.CLOSE);
}
return future;
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Exception e = cause instanceof Exception ? (Exception) cause : new
Exception(cause);
final String output = JsonUtil.toJsonString(e);
ByteBuf content = Unpooled.wrappedBuffer(output.getBytes());
final DefaultFullHttpResponse resp = new DefaultFullHttpResponse(
HTTP_1_1, INTERNAL_SERVER_ERROR, content);
resp.headers().set("Content-Type", "application/json");
if (e instanceof IllegalArgumentException) {
resp.setStatus(BAD_REQUEST);
} else if (e instanceof FileNotFoundException) {
resp.setStatus(NOT_FOUND);
}
resp.headers().set("Content-Length", resp.content().readableBytes());
resp.headers().set("Connection", "close");
ctx.write(resp).addListener(ChannelFutureListener.CLOSE);
}
private static String getOp(QueryStringDecoder decoder) {
Map<String, List<String>> parameters = decoder.getParameters();
Map<String, List<String>> parameters = decoder.parameters();
return parameters.containsKey("op")
? parameters.get("op").get(0).toUpperCase() : null;
}
private static String getPath(QueryStringDecoder decoder)
throws FileNotFoundException {
String path = decoder.getPath();
String path = decoder.path();
if (path.startsWith("/webhdfs/v1/")) {
return path.substring(11);
} else {

View File

@ -112,12 +112,12 @@ private static Options buildOptions() {
* Command line options
* @throws IOException
*/
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws Exception {
int status = run(args);
System.exit(status);
}
public static int run(String[] args) throws IOException {
public static int run(String[] args) throws Exception {
Options options = buildOptions();
if (args.length == 0) {
printUsage();
@ -159,8 +159,13 @@ public static int run(String[] args) throws IOException {
"r"));
} else if (processor.equals("Web")) {
String addr = cmd.getOptionValue("addr", "localhost:5978");
new WebImageViewer(NetUtils.createSocketAddr(addr))
.initServerAndWait(inputFile);
WebImageViewer viewer = new WebImageViewer(NetUtils.createSocketAddr
(addr));
try {
viewer.start(inputFile);
} finally {
viewer.close();
}
}
return 0;
} catch (EOFException e) {

View File

@ -17,48 +17,51 @@
*/
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* WebImageViewer loads a fsimage and exposes read-only WebHDFS API for its
* namespace.
*/
public class WebImageViewer {
public class WebImageViewer implements Closeable {
public static final Log LOG = LogFactory.getLog(WebImageViewer.class);
private Channel channel;
private InetSocketAddress address;
private final ChannelFactory factory =
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(), 1);
private final ServerBootstrap bootstrap = new ServerBootstrap(factory);
static final ChannelGroup allChannels =
new DefaultChannelGroup("WebImageViewer");
private final ServerBootstrap bootstrap;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final ChannelGroup allChannels;
public WebImageViewer(InetSocketAddress address) {
this.address = address;
this.bossGroup = new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
this.allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
this.bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class);
}
/**
@ -66,13 +69,13 @@ public WebImageViewer(InetSocketAddress address) {
* @param fsimage the fsimage to load.
* @throws IOException if failed to load the fsimage.
*/
public void initServerAndWait(String fsimage) throws IOException {
initServer(fsimage);
public void start(String fsimage) throws IOException {
try {
channel.getCloseFuture().await();
initServer(fsimage);
channel.closeFuture().await();
} catch (InterruptedException e) {
LOG.info("Interrupted. Stopping the WebImageViewer.");
shutdown();
close();
}
}
@ -82,37 +85,26 @@ public void initServerAndWait(String fsimage) throws IOException {
* @throws IOException if fail to load the fsimage.
*/
@VisibleForTesting
public void initServer(String fsimage) throws IOException {
FSImageLoader loader = FSImageLoader.load(fsimage);
public void initServer(String fsimage)
throws IOException, InterruptedException {
final FSImageLoader loader = FSImageLoader.load(fsimage);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("channelTracker", new SimpleChannelUpstreamHandler() {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
allChannels.add(e.getChannel());
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder(),
new StringEncoder(),
new HttpResponseEncoder(),
new FSImageHandler(loader, allChannels));
}
});
pipeline.addLast("httpDecoder", new HttpRequestDecoder());
pipeline.addLast("requestHandler", new FSImageHandler(loader));
pipeline.addLast("stringEncoder", new StringEncoder());
pipeline.addLast("httpEncoder", new HttpResponseEncoder());
bootstrap.setPipeline(pipeline);
channel = bootstrap.bind(address);
channel = bootstrap.bind(address).sync().channel();
allChannels.add(channel);
address = (InetSocketAddress) channel.getLocalAddress();
LOG.info("WebImageViewer started. Listening on " + address.toString()
+ ". Press Ctrl+C to stop the viewer.");
}
/**
* Stop WebImageViewer.
*/
@VisibleForTesting
public void shutdown() {
allChannels.close().awaitUninterruptibly();
factory.releaseExternalResources();
address = (InetSocketAddress) channel.localAddress();
LOG.info("WebImageViewer started. Listening on " + address.toString() + ". Press Ctrl+C to stop the viewer.");
}
/**
@ -123,4 +115,11 @@ public void shutdown() {
public int getPort() {
return address.getPort();
}
@Override
public void close() {
allChannels.close().awaitUninterruptibly();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -237,7 +237,7 @@ public int compare(FileStatus first, FileStatus second) {
}
@Test
public void testFileDistributionCalculatorWithOptions() throws IOException {
public void testFileDistributionCalculatorWithOptions() throws Exception {
int status = OfflineImageViewerPB.run(new String[] {"-i",
originalFsimage.getAbsolutePath(), "-o", "-", "-p", "FileDistribution",
"-maxSize", "512", "-step", "8"});
@ -258,8 +258,7 @@ public void testPBImageXmlWriter() throws IOException, SAXException,
}
@Test
public void testWebImageViewer() throws IOException, InterruptedException,
URISyntaxException {
public void testWebImageViewer() throws Exception {
WebImageViewer viewer = new WebImageViewer(
NetUtils.createSocketAddr("localhost:0"));
try {
@ -319,7 +318,7 @@ public void testWebImageViewer() throws IOException, InterruptedException,
connection.getResponseCode());
} finally {
// shutdown the viewer
viewer.shutdown();
viewer.close();
}
}

View File

@ -157,8 +157,7 @@ public static void deleteOriginalFSImage() throws IOException {
}
@Test
public void testWebImageViewerForAcl() throws IOException,
InterruptedException, URISyntaxException {
public void testWebImageViewerForAcl() throws Exception {
WebImageViewer viewer = new WebImageViewer(
NetUtils.createSocketAddr("localhost:0"));
try {
@ -200,7 +199,7 @@ public void testWebImageViewerForAcl() throws IOException,
connection.getResponseCode());
} finally {
// shutdown the viewer
viewer.shutdown();
viewer.close();
}
}
}

View File

@ -536,6 +536,12 @@
<version>3.6.2.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>