From ada233b7cd7db39e609bb57e487fee8cec59cd48 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Sun, 24 May 2015 22:30:32 -0700 Subject: [PATCH] HDFS-8377. Support HTTP/2 in datanode. Contributed by Duo Zhang. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 5 + .../datanode/web/DatanodeHttpServer.java | 9 +- .../web/PortUnificationServerHandler.java | 85 ++++++++++ .../datanode/web/SimpleHttpProxyHandler.java | 18 +-- .../server/datanode/web/URLDispatcher.java | 9 +- .../web/dtp/DtpHttp2FrameListener.java | 52 +++++++ .../datanode/web/dtp/DtpHttp2Handler.java | 34 ++++ .../web/webhdfs/ExceptionHandler.java | 29 ++-- .../datanode/web/webhdfs/HdfsWriter.java | 10 +- .../datanode/web/webhdfs/WebHdfsHandler.java | 57 +++---- .../offlineImageViewer/FSImageHandler.java | 39 ++--- .../web/dtp/Http2ResponseHandler.java | 65 ++++++++ .../server/datanode/web/dtp/TestDtpHttp2.java | 147 ++++++++++++++++++ hadoop-project/pom.xml | 8 +- 15 files changed, 482 insertions(+), 87 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 68775da67f..03f766c427 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -580,6 +580,8 @@ Release 2.8.0 - UNRELEASED HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp. (wheat9) + HDFS-8377. Support HTTP/2 in datanode. (Duo Zhang via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 1edb67dda6..34338d1e59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -181,6 +181,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> netty-all compile + + com.twitter + hpack + compile + xerces xercesImpl diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java index f461ddaa50..613d3ceb82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode.web; -import io.netty.bootstrap.ChannelFactory; +import io.netty.channel.ChannelFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -83,11 +83,8 @@ public DatanodeHttpServer(final Configuration conf, final InetSocketAddress .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline p = ch.pipeline(); - p.addLast(new HttpRequestDecoder(), - new HttpResponseEncoder(), - new ChunkedWriteHandler(), - new URLDispatcher(jettyAddr, conf, confForCreate)); + ch.pipeline().addLast(new PortUnificationServerHandler(jettyAddr, + conf, confForCreate)); } }); if (externalHttpChannel == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java new file mode 100644 index 0000000000..7ebc070482 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web; + +import java.net.InetSocketAddress; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpHttp2Handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.stream.ChunkedWriteHandler; + +/** + * A port unification handler to support HTTP/1.1 and HTTP/2 on the same port. + */ +@InterfaceAudience.Private +public class PortUnificationServerHandler extends ByteToMessageDecoder { + + private static final ByteBuf HTTP2_CLIENT_CONNECTION_PREFACE = Http2CodecUtil + .connectionPrefaceBuf(); + + // we only want to support HTTP/1.1 and HTTP/2, so the first 3 bytes is + // enough. No HTTP/1.1 request could start with "PRI" + private static final int MAGIC_HEADER_LENGTH = 3; + + private final InetSocketAddress proxyHost; + + private final Configuration conf; + + private final Configuration confForCreate; + + public PortUnificationServerHandler(InetSocketAddress proxyHost, + Configuration conf, Configuration confForCreate) { + this.proxyHost = proxyHost; + this.conf = conf; + this.confForCreate = confForCreate; + } + + private void configureHttp1(ChannelHandlerContext ctx) { + ctx.pipeline().addLast(new HttpServerCodec(), new ChunkedWriteHandler(), + new URLDispatcher(proxyHost, conf, confForCreate)); + } + + private void configureHttp2(ChannelHandlerContext ctx) { + ctx.pipeline().addLast(new DtpHttp2Handler()); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, + List out) throws Exception { + if (in.readableBytes() < MAGIC_HEADER_LENGTH) { + return; + } + if (ByteBufUtil.equals(in, 0, HTTP2_CLIENT_CONNECTION_PREFACE, 0, + MAGIC_HEADER_LENGTH)) { + configureHttp2(ctx); + } else { + configureHttp1(ctx); + } + ctx.pipeline().remove(this); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java index ffa7681bb2..6b0f013027 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.datanode.web; +import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -31,17 +34,14 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponseEncoder; -import org.apache.commons.logging.Log; import java.net.InetSocketAddress; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static io.netty.handler.codec.http.HttpHeaders.Values; -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import org.apache.commons.logging.Log; /** * Dead simple session-layer HTTP proxy. It gets the HTTP responses @@ -98,7 +98,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void channelRead0 (final ChannelHandlerContext ctx, final HttpRequest req) { - uri = req.getUri(); + uri = req.uri(); final Channel client = ctx.channel(); Bootstrap proxiedServer = new Bootstrap() .group(client.eventLoop()) @@ -118,14 +118,14 @@ public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { ctx.channel().pipeline().remove(HttpResponseEncoder.class); HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1, - req.getMethod(), req.getUri()); + req.method(), req.uri()); newReq.headers().add(req.headers()); - newReq.headers().set(CONNECTION, Values.CLOSE); + newReq.headers().set(CONNECTION, HttpHeaderValues.CLOSE); future.channel().writeAndFlush(newReq); } else { DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR); - resp.headers().set(CONNECTION, Values.CLOSE); + resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE); LOG.info("Proxy " + uri + " failed. Cause: ", future.cause()); ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); client.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java index ff3f468499..7627d94bd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java @@ -17,17 +17,16 @@ */ package org.apache.hadoop.hdfs.server.datanode.web; +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpRequest; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler; -import java.io.IOException; import java.net.InetSocketAddress; -import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler; class URLDispatcher extends SimpleChannelInboundHandler { private final InetSocketAddress proxyHost; @@ -44,7 +43,7 @@ class URLDispatcher extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws Exception { - String uri = req.getUri(); + String uri = req.uri(); ChannelPipeline p = ctx.pipeline(); if (uri.startsWith(WEBHDFS_PREFIX)) { WebHdfsHandler h = new WebHdfsHandler(conf, confForCreate); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java new file mode 100644 index 0000000000..41e7cf449b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.dtp; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameAdapter; +import io.netty.handler.codec.http2.Http2Headers; + +import java.nio.charset.StandardCharsets; + +class DtpHttp2FrameListener extends Http2FrameAdapter { + + private Http2ConnectionEncoder encoder; + + public void encoder(Http2ConnectionEncoder encoder) { + this.encoder = encoder; + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, + boolean exclusive, int padding, boolean endStream) throws Http2Exception { + encoder.writeHeaders(ctx, streamId, + new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText()), 0, + false, ctx.newPromise()); + encoder.writeData( + ctx, + streamId, + ctx.alloc().buffer() + .writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8)), 0, true, + ctx.newPromise()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java new file mode 100644 index 0000000000..5b6f279cf2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.dtp; + +import org.apache.hadoop.classification.InterfaceAudience; + +import io.netty.handler.codec.http2.Http2ConnectionHandler; + +/** + * The HTTP/2 handler. + */ +@InterfaceAudience.Private +public class DtpHttp2Handler extends Http2ConnectionHandler { + + public DtpHttp2Handler() { + super(true, new DtpHttp2FrameListener()); + ((DtpHttp2FrameListener) decoder().listener()).encoder(encoder()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ExceptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ExceptionHandler.java index a7bb4907d9..779a397f34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ExceptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ExceptionHandler.java @@ -17,12 +17,21 @@ */ package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; -import com.google.common.base.Charsets; -import com.sun.jersey.api.ParamException; -import com.sun.jersey.api.container.ContainerException; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; + +import java.io.FileNotFoundException; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.ipc.RemoteException; @@ -30,17 +39,9 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.token.SecretManager; -import java.io.FileNotFoundException; -import java.io.IOException; - -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; -import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8; +import com.google.common.base.Charsets; +import com.sun.jersey.api.ParamException; +import com.sun.jersey.api.container.ContainerException; class ExceptionHandler { static Log LOG = WebHdfsHandler.LOG; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/HdfsWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/HdfsWriter.java index 0433ce65ae..99924e5c16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/HdfsWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/HdfsWriter.java @@ -17,21 +17,21 @@ */ package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; +import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; -import org.apache.commons.logging.Log; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.io.IOUtils; import java.io.IOException; import java.io.OutputStream; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE; +import org.apache.commons.logging.Log; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.io.IOUtils; class HdfsWriter extends SimpleChannelInboundHandler { private final DFSClient client; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java index 4d705b0dd9..9eefb6646a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java @@ -17,7 +17,22 @@ */ package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; -import com.google.common.base.Preconditions; +import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS; +import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN; +import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaderNames.LOCATION; +import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.POST; +import static io.netty.handler.codec.http.HttpMethod.PUT; +import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE; +import static io.netty.handler.codec.http.HttpResponseStatus.CREATED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME; +import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -29,6 +44,15 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.stream.ChunkedStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; + import org.apache.commons.io.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,30 +73,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.LimitInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.security.PrivilegedExceptionAction; -import java.util.EnumSet; - -import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_METHODS; -import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static io.netty.handler.codec.http.HttpHeaders.Names.LOCATION; -import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE; -import static io.netty.handler.codec.http.HttpMethod.GET; -import static io.netty.handler.codec.http.HttpMethod.POST; -import static io.netty.handler.codec.http.HttpMethod.PUT; -import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE; -import static io.netty.handler.codec.http.HttpResponseStatus.CREATED; -import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME; -import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND; +import com.google.common.base.Preconditions; public class WebHdfsHandler extends SimpleChannelInboundHandler { static final Log LOG = LogFactory.getLog(WebHdfsHandler.class); @@ -99,8 +100,8 @@ public WebHdfsHandler(Configuration conf, Configuration confForCreate) @Override public void channelRead0(final ChannelHandlerContext ctx, final HttpRequest req) throws Exception { - Preconditions.checkArgument(req.getUri().startsWith(WEBHDFS_PREFIX)); - QueryStringDecoder queryString = new QueryStringDecoder(req.getUri()); + Preconditions.checkArgument(req.uri().startsWith(WEBHDFS_PREFIX)); + QueryStringDecoder queryString = new QueryStringDecoder(req.uri()); params = new ParameterParser(queryString, conf); DataNodeUGIProvider ugiProvider = new DataNodeUGIProvider(params); ugi = ugiProvider.ugi(); @@ -119,7 +120,7 @@ public Void run() throws Exception { public void handle(ChannelHandlerContext ctx, HttpRequest req) throws IOException, URISyntaxException { String op = params.op(); - HttpMethod method = req.getMethod(); + HttpMethod method = req.method(); if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op) && method == PUT) { onCreate(ctx); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageHandler.java index 429b6fc487..37db8b7664 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageHandler.java @@ -17,7 +17,18 @@ */ package org.apache.hadoop.hdfs.tools.offlineImageViewer; -import com.google.common.base.Charsets; +import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8; +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX; +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; @@ -30,28 +41,18 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.QueryStringDecoder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.web.JsonUtil; -import org.apache.hadoop.util.StringUtils; import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; import java.util.Map; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE; -import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; -import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8; -import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX; -import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.web.JsonUtil; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Charsets; /** * Implement the read-only WebHDFS API for fsimage. @@ -74,7 +75,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception { - if (request.getMethod() != HttpMethod.GET) { + if (request.method() != HttpMethod.GET) { DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, METHOD_NOT_ALLOWED); resp.headers().set(CONNECTION, CLOSE); @@ -82,7 +83,7 @@ public void channelRead0(ChannelHandlerContext ctx, HttpRequest request) return; } - QueryStringDecoder decoder = new QueryStringDecoder(request.getUri()); + QueryStringDecoder decoder = new QueryStringDecoder(request.uri()); final String op = getOp(decoder); final String content; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java new file mode 100644 index 0000000000..eb8b91817b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.dtp; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http2.HttpUtil; +import io.netty.util.concurrent.Promise; + +import java.util.HashMap; +import java.util.Map; + +public class Http2ResponseHandler extends + SimpleChannelInboundHandler { + + private Map> streamId2Promise = + new HashMap<>(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) + throws Exception { + Integer streamId = + msg.headers().getInt(HttpUtil.ExtensionHeaderNames.STREAM_ID.text()); + if (streamId == null) { + System.err.println("HttpResponseHandler unexpected message received: " + + msg); + return; + } + if (streamId.intValue() == 1) { + // this is the upgrade response message, just ignore it. + return; + } + Promise promise; + synchronized (this) { + promise = streamId2Promise.get(streamId); + } + if (promise == null) { + System.err.println("Message received for unknown stream id " + streamId); + } else { + // Do stuff with the message (for now just print it) + promise.setSuccess(msg.retain()); + + } + } + + public void put(Integer streamId, Promise promise) { + streamId2Promise.put(streamId, promise); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java new file mode 100644 index 0000000000..4e91004314 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.dtp; + +import static org.junit.Assert.assertEquals; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2FrameReader; +import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; +import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionHandler; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2FrameReader; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2InboundFrameLogger; +import io.netty.handler.codec.http2.Http2OutboundFrameLogger; +import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler; +import io.netty.handler.codec.http2.HttpUtil; +import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.timeout.TimeoutException; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestDtpHttp2 { + + private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger( + LogLevel.INFO, TestDtpHttp2.class); + + private static final Configuration CONF = WebHdfsTestUtil.createConf(); + + private static MiniDFSCluster CLUSTER; + + private static final EventLoopGroup WORKER_GROUP = new NioEventLoopGroup(); + + private static Channel CHANNEL; + + private static Http2ResponseHandler RESPONSE_HANDLER; + + @BeforeClass + public static void setUp() throws IOException, URISyntaxException, + TimeoutException { + CLUSTER = new MiniDFSCluster.Builder(CONF).numDataNodes(1).build(); + CLUSTER.waitActive(); + + RESPONSE_HANDLER = new Http2ResponseHandler(); + Bootstrap bootstrap = + new Bootstrap() + .group(WORKER_GROUP) + .channel(NioSocketChannel.class) + .remoteAddress("127.0.0.1", + CLUSTER.getDataNodes().get(0).getInfoPort()) + .handler(new ChannelInitializer() { + + @Override + protected void initChannel(Channel ch) throws Exception { + Http2Connection connection = new DefaultHttp2Connection(false); + Http2ConnectionHandler connectionHandler = + new HttpToHttp2ConnectionHandler(connection, frameReader(), + frameWriter(), new DelegatingDecompressorFrameListener( + connection, new InboundHttp2ToHttpAdapter.Builder( + connection).maxContentLength(Integer.MAX_VALUE) + .propagateSettings(true).build())); + ch.pipeline().addLast(connectionHandler, RESPONSE_HANDLER); + } + }); + CHANNEL = bootstrap.connect().syncUninterruptibly().channel(); + + } + + @AfterClass + public static void tearDown() throws IOException { + if (CHANNEL != null) { + CHANNEL.close().syncUninterruptibly(); + } + WORKER_GROUP.shutdownGracefully(); + if (CLUSTER != null) { + CLUSTER.shutdown(); + } + } + + private static Http2FrameReader frameReader() { + return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), + FRAME_LOGGER); + } + + private static Http2FrameWriter frameWriter() { + return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), + FRAME_LOGGER); + } + + @Test + public void test() throws InterruptedException, ExecutionException { + int streamId = 3; + FullHttpRequest request = + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(), + streamId); + Promise promise = CHANNEL.eventLoop().newPromise(); + synchronized (RESPONSE_HANDLER) { + CHANNEL.writeAndFlush(request); + RESPONSE_HANDLER.put(streamId, promise); + } + assertEquals(HttpResponseStatus.OK, promise.get().status()); + ByteBuf content = promise.get().content(); + assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8)); + } +} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 78903fae2d..1d0262fcdc 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -579,7 +579,13 @@ io.netty netty-all - 4.0.23.Final + 4.1.0.Beta5 + + + + com.twitter + hpack + 0.11.0