diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 12255852a9..7a898741fa 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -3,6 +3,7 @@ Hadoop Change Log
Trunk (unreleased changes)
INCOMPATIBLE CHANGES
+ HADOOP-7920. Remove Avro Rpc. (suresh)
NEW FEATURES
HADOOP-7773. Add support for protocol buffer based RPC engine.
@@ -136,6 +137,10 @@ Trunk (unreleased changes)
HADOOP-7913 Fix bug in ProtoBufRpcEngine (sanjay)
+ HADOOP-7810. move hadoop archive to core from tools. (tucu)
+
+ HADOOP-7892. IPC logs too verbose after "RpcKind" introduction (todd)
+
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 1dc545a274..f477d385a5 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -239,11 +239,6 @@
avro
compile
-
- org.apache.avro
- avro-ipc
- compile
-
net.sf.kosmosfs
kfs
@@ -282,7 +277,6 @@
generate-test-sources
schema
- protocol
diff --git a/hadoop-mapreduce-project/src/tools/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
similarity index 100%
rename from hadoop-mapreduce-project/src/tools/org/apache/hadoop/fs/HarFileSystem.java
rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
deleted file mode 100644
index 8fec3d22b8..0000000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import java.io.Closeable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
-import javax.net.SocketFactory;
-
-import org.apache.avro.ipc.Responder;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.reflect.ReflectRequestor;
-import org.apache.avro.ipc.reflect.ReflectResponder;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-/** Tunnel Avro-format RPC requests over a Hadoop {@link RPC} connection. This
- * does not give cross-language wire compatibility, since the Hadoop RPC wire
- * format is non-standard, but it does permit use of Avro's protocol versioning
- * features for inter-Java RPCs. */
-@InterfaceStability.Evolving
-public class AvroRpcEngine implements RpcEngine {
- private static final Log LOG = LogFactory.getLog(RPC.class);
-
- private static int VERSION = 1;
-
- // the implementation we tunnel through
- private static final RpcEngine ENGINE = new WritableRpcEngine();
-
- /** Tunnel an Avro RPC request and response through Hadoop's RPC. */
- private static interface TunnelProtocol extends VersionedProtocol {
- //WritableRpcEngine expects a versionID in every protocol.
- public static final long versionID = VERSION;
- /** All Avro methods and responses go through this. */
- BufferListWritable call(String protocol, BufferListWritable request)
- throws IOException;
- }
-
- /** A Writable that holds a List, The Avro RPC Transceiver's
- * basic unit of data transfer.*/
- private static class BufferListWritable implements Writable {
- private List buffers;
-
- public BufferListWritable() {} // required for RPC Writables
-
- public BufferListWritable(List buffers) {
- this.buffers = buffers;
- }
-
- public void readFields(DataInput in) throws IOException {
- int size = in.readInt();
- buffers = new ArrayList(size);
- for (int i = 0; i < size; i++) {
- int length = in.readInt();
- ByteBuffer buffer = ByteBuffer.allocate(length);
- in.readFully(buffer.array(), 0, length);
- buffers.add(buffer);
- }
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeInt(buffers.size());
- for (ByteBuffer buffer : buffers) {
- out.writeInt(buffer.remaining());
- out.write(buffer.array(), buffer.position(), buffer.remaining());
- }
- }
- }
-
- /** An Avro RPC Transceiver that tunnels client requests through Hadoop
- * RPC. */
- private static class ClientTransceiver extends Transceiver {
- private TunnelProtocol tunnel;
- private InetSocketAddress remote;
- private String protocol;
-
- public ClientTransceiver(InetSocketAddress addr,
- UserGroupInformation ticket,
- Configuration conf, SocketFactory factory,
- int rpcTimeout, String protocol)
- throws IOException {
- this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION,
- addr, ticket, conf, factory,
- rpcTimeout).getProxy();
- this.remote = addr;
- this.protocol = protocol;
- }
-
- public String getRemoteName() { return remote.toString(); }
-
- public List transceive(List request)
- throws IOException {
- return tunnel.call(protocol, new BufferListWritable(request)).buffers;
- }
-
- public List readBuffers() throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public void writeBuffers(List buffers) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public void close() throws IOException {
- RPC.stopProxy(tunnel);
- }
- }
-
- /** Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- * @param */
- @SuppressWarnings("unchecked")
- public ProtocolProxy getProxy(Class protocol, long clientVersion,
- InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory,
- int rpcTimeout)
- throws IOException {
- return new ProtocolProxy(protocol,
- (T)Proxy.newProxyInstance(
- protocol.getClassLoader(),
- new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)),
- false);
- }
-
- private class Invoker implements InvocationHandler, Closeable {
- private final ClientTransceiver tx;
- private final SpecificRequestor requestor;
- public Invoker(Class> protocol, InetSocketAddress addr,
- UserGroupInformation ticket, Configuration conf,
- SocketFactory factory,
- int rpcTimeout) throws IOException {
- this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout,
- protocol.getName());
- this.requestor = createRequestor(protocol, tx);
- }
- @Override public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- return requestor.invoke(proxy, method, args);
- }
- public void close() throws IOException {
- tx.close();
- }
- }
-
- protected SpecificRequestor createRequestor(Class> protocol,
- Transceiver transeiver) throws IOException {
- return new ReflectRequestor(protocol, transeiver);
- }
-
- protected Responder createResponder(Class> iface, Object impl) {
- return new ReflectResponder(iface, impl);
- }
-
- /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
- private class TunnelResponder implements TunnelProtocol {
- private Map responders =
- new HashMap();
-
- public void addProtocol(Class> iface, Object impl) {
- responders.put(iface.getName(), createResponder(iface, impl));
- }
-
- @Override
- public long getProtocolVersion(String protocol, long version)
- throws IOException {
- return VERSION;
- }
-
- @Override
- public ProtocolSignature getProtocolSignature(
- String protocol, long version, int clientMethodsHashCode)
- throws IOException {
- return ProtocolSignature.getProtocolSignature
- (clientMethodsHashCode, VERSION, TunnelProtocol.class);
- }
-
- public BufferListWritable call(String protocol, BufferListWritable request)
- throws IOException {
- Responder responder = responders.get(protocol);
- if (responder == null)
- throw new IOException("No responder for: "+protocol);
- return new BufferListWritable(responder.respond(request.buffers));
- }
-
- }
-
- public Object[] call(Method method, Object[][] params,
- InetSocketAddress[] addrs, UserGroupInformation ticket,
- Configuration conf) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- private class Server extends WritableRpcEngine.Server {
- private TunnelResponder responder = new TunnelResponder();
-
- public Server(Class> iface, Object impl, String bindAddress,
- int port, int numHandlers, int numReaders,
- int queueSizePerHandler, boolean verbose,
- Configuration conf,
- SecretManager extends TokenIdentifier> secretManager
- ) throws IOException {
- super((Class)null, new Object(), conf,
- bindAddress, port, numHandlers, numReaders,
- queueSizePerHandler, verbose, secretManager);
- // RpcKind is WRITABLE since Avro is tunneled through WRITABLE
- super.addProtocol(RpcKind.RPC_WRITABLE, TunnelProtocol.class, responder);
- responder.addProtocol(iface, impl);
- }
-
-
- @Override
- public Server
- addProtocol(RpcKind rpcKind, Class> protocolClass, Object protocolImpl)
- throws IOException {
- responder.addProtocol(protocolClass, protocolImpl);
- return this;
- }
- }
-
- /** Construct a server for a protocol implementation instance listening on a
- * port and address. */
- public RPC.Server getServer(Class> iface, Object impl, String bindAddress,
- int port, int numHandlers, int numReaders,
- int queueSizePerHandler, boolean verbose,
- Configuration conf,
- SecretManager extends TokenIdentifier> secretManager
- ) throws IOException {
- return new Server
- (iface, impl, bindAddress, port, numHandlers, numReaders,
- queueSizePerHandler, verbose, conf, secretManager);
- }
-
-}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
index 430e0a9dea..6e97159fb4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
@@ -56,9 +56,8 @@ static RpcPayloadOperation readFields(DataInput in) throws IOException {
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
- RPC_PROTOCOL_BUFFER ((short) 3), // Use ProtobufRpcEngine
- RPC_AVRO ((short) 4); // Use AvroRpcEngine
- static final short MAX_INDEX = RPC_AVRO.value; // used for array size
+ RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
+ final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
private static final short FIRST_INDEX = RPC_BUILTIN.value;
private final short value;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 52ea35c522..b9220a6df5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -170,7 +170,7 @@ public static void registerProtocolEngine(RpcKind rpcKind,
throw new IllegalArgumentException("ReRegistration of rpcKind: " +
rpcKind);
}
- LOG.info("rpcKind=" + rpcKind +
+ LOG.debug("rpcKind=" + rpcKind +
", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
", rpcInvoker=" + rpcInvoker);
}
diff --git a/hadoop-common-project/hadoop-common/src/test/avro/AvroSpecificTestProtocol.avpr b/hadoop-common-project/hadoop-common/src/test/avro/AvroSpecificTestProtocol.avpr
deleted file mode 100644
index 18960c1de4..0000000000
--- a/hadoop-common-project/hadoop-common/src/test/avro/AvroSpecificTestProtocol.avpr
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-{
- "protocol" : "AvroSpecificTestProtocol",
- "namespace" : "org.apache.hadoop.ipc",
-
- "messages" : {
- "echo" : {
- "request" : [ {
- "name" : "message",
- "type" : "string"
- } ],
- "response" : "string"
- },
-
- "add" : {
- "request" : [ {
- "name" : "arg1",
- "type" : "int"
- }, {
- "name" : "arg2",
- "type" : "int",
- "default" : 0
- } ],
- "response" : "int"
- }
- }
-}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
similarity index 100%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/TestHarFileSystem.java
rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
deleted file mode 100644
index 5ce3359428..0000000000
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import javax.security.sasl.Sasl;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.util.Utf8;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.ipc.TestSaslRPC.CustomSecurityInfo;
-import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier;
-import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-
-/** Unit tests for AvroRpc. */
-public class TestAvroRpc extends TestCase {
- private static final String ADDRESS = "0.0.0.0";
-
- public static final Log LOG =
- LogFactory.getLog(TestAvroRpc.class);
-
- int datasize = 1024*100;
- int numThreads = 50;
-
- public TestAvroRpc(String name) { super(name); }
-
- public static interface EmptyProtocol {}
- public static class EmptyImpl implements EmptyProtocol {}
-
- public static class TestImpl implements AvroTestProtocol {
-
- public void ping() {}
-
- public String echo(String value) { return value; }
-
- public int add(int v1, int v2) { return v1 + v2; }
-
- public int error() throws Problem {
- throw new Problem();
- }
- }
-
- public void testReflect() throws Exception {
- testReflect(false);
- }
-
- public void testSecureReflect() throws Exception {
- testReflect(true);
- }
-
- public void testSpecific() throws Exception {
- testSpecific(false);
- }
-
- public void testSecureSpecific() throws Exception {
- testSpecific(true);
- }
-
- private void testReflect(boolean secure) throws Exception {
- Configuration conf = new Configuration();
- TestTokenSecretManager sm = null;
- if (secure) {
- makeSecure(conf);
- sm = new TestTokenSecretManager();
- }
- UserGroupInformation.setConfiguration(conf);
- RPC.setProtocolEngine(conf, EmptyProtocol.class, AvroRpcEngine.class);
- RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
- RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(),
- ADDRESS, 0, 5, true, conf, sm);
- server.addProtocol(RpcKind.RPC_WRITABLE,
- AvroTestProtocol.class, new TestImpl());
-
- try {
- server.start();
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
-
- if (secure) {
- addToken(sm, addr);
- //QOP must be auth
- Assert.assertEquals("auth", SaslRpcServer.SASL_PROPS.get(Sasl.QOP));
- }
-
- AvroTestProtocol proxy =
- (AvroTestProtocol)RPC.getProxy(AvroTestProtocol.class, 0, addr, conf);
-
- proxy.ping();
-
- String echo = proxy.echo("hello world");
- assertEquals("hello world", echo);
-
- int intResult = proxy.add(1, 2);
- assertEquals(3, intResult);
-
- boolean caught = false;
- try {
- proxy.error();
- } catch (AvroRemoteException e) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Caught " + e);
- }
- caught = true;
- }
- assertTrue(caught);
-
- } finally {
- resetSecurity();
- server.stop();
- }
- }
-
- private void makeSecure(Configuration conf) {
- conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- conf.set("hadoop.rpc.socket.factory.class.default", "");
- //Avro doesn't work with security annotations on protocol.
- //Avro works ONLY with custom security context
- SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo());
- }
-
- private void resetSecurity() {
- SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
- }
-
- private void addToken(TestTokenSecretManager sm,
- InetSocketAddress addr) throws IOException {
- final UserGroupInformation current = UserGroupInformation.getCurrentUser();
-
- TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
- .getUserName()));
- Token token = new Token(tokenId,
- sm);
- Text host = new Text(addr.getAddress().getHostAddress() + ":"
- + addr.getPort());
- token.setService(host);
- LOG.info("Service IP address for token is " + host);
- current.addToken(token);
- }
-
- private void testSpecific(boolean secure) throws Exception {
- Configuration conf = new Configuration();
- TestTokenSecretManager sm = null;
- if (secure) {
- makeSecure(conf);
- sm = new TestTokenSecretManager();
- }
- UserGroupInformation.setConfiguration(conf);
- RPC.setProtocolEngine(conf, AvroSpecificTestProtocol.class,
- AvroSpecificRpcEngine.class);
- Server server = RPC.getServer(AvroSpecificTestProtocol.class,
- new AvroSpecificTestProtocolImpl(), ADDRESS, 0, 5, true,
- conf, sm);
- try {
- server.start();
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
-
- if (secure) {
- addToken(sm, addr);
- //QOP must be auth
- Assert.assertEquals("auth", SaslRpcServer.SASL_PROPS.get(Sasl.QOP));
- }
-
- AvroSpecificTestProtocol proxy =
- (AvroSpecificTestProtocol)RPC.getProxy(AvroSpecificTestProtocol.class,
- 0, addr, conf);
-
- CharSequence echo = proxy.echo("hello world");
- assertEquals("hello world", echo.toString());
-
- int intResult = proxy.add(1, 2);
- assertEquals(3, intResult);
-
- } finally {
- resetSecurity();
- server.stop();
- }
- }
-
- public static class AvroSpecificTestProtocolImpl implements
- AvroSpecificTestProtocol {
-
- @Override
- public int add(int arg1, int arg2) throws AvroRemoteException {
- return arg1 + arg2;
- }
-
- @Override
- public CharSequence echo(CharSequence msg) throws AvroRemoteException {
- return msg;
- }
-
- }
-
-}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
index 8ae1563541..fbe56b6395 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
@@ -269,6 +269,13 @@
+
+
+
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+ 2.6
+
org.apache.maven.plugins
maven-surefire-plugin
diff --git a/hadoop-hdfs-project/pom.xml b/hadoop-hdfs-project/pom.xml
index 824edc3210..299d6f8634 100644
--- a/hadoop-hdfs-project/pom.xml
+++ b/hadoop-hdfs-project/pom.xml
@@ -30,6 +30,7 @@
hadoop-hdfs
hadoop-hdfs-httpfs
+ hadoop-hdfs/src/contrib/bkjournal
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 7edc98da35..6748d60a7f 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -3,6 +3,7 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
INCOMPATIBLE CHANGES
+ MAPREDUCE-3545. Remove Avro RPC. (suresh)
NEW FEATURES
@@ -77,6 +78,12 @@ Trunk (unreleased changes)
MAPREDUCE-3389. MRApps loads the 'mrapp-generated-classpath' file with
classpath from the build machine. (tucu)
+ MAPREDUCE-3544. gridmix build is broken, requires hadoop-archives to be added as
+ ivy dependency. (tucu)
+
+ MAPREDUCE-3557. MR1 test fail to compile because of missing hadoop-archives dependency.
+ (tucu)
+
Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES
@@ -85,6 +92,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3121. NodeManager should handle disk-failures (Ravi Gummadi via mahadev)
+ MAPREDUCE-2863. Support web services for YARN and MR components. (Thomas
+ Graves via vinodkv)
+
IMPROVEMENTS
MAPREDUCE-3297. Moved log related components into yarn-common so that
@@ -276,6 +286,22 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3537. Fix race condition in DefaultContainerExecutor which led
to container localization occuring in wrong directories. (acmurthy)
+ MAPREDUCE-3542. Support "FileSystemCounter" legacy counter group name for
+ compatibility. (tomwhite)
+
+ MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs
+ in the correct directory to work properly in secure mode. (Hitesh Shah via
+ vinodkv)
+
+ MAPREDUCE-3541. Fix broken TestJobQueueClient test. (Ravi Prakash via
+ mahadev)
+
+ MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode.
+ (Siddharth Seth via vinodkv)
+
+ MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the
+ ResourceManager. (Arun C Murthy via vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index d2400f053f..cb3e80b8b2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -22,20 +22,19 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
-import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -47,13 +46,12 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.service.AbstractService;
/**
@@ -80,7 +78,10 @@ public LocalContainerLauncher(AppContext context,
super(LocalContainerLauncher.class.getName());
this.context = context;
this.umbilical = umbilical;
- // umbilical: MRAppMaster creates (taskAttemptListener), passes to us (TODO/FIXME: pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar: implement umbilical protocol but skip RPC stuff)
+ // umbilical: MRAppMaster creates (taskAttemptListener), passes to us
+ // (TODO/FIXME: pointless to use RPC to talk to self; should create
+ // LocalTaskAttemptListener or similar: implement umbilical protocol
+ // but skip RPC stuff)
try {
curFC = FileContext.getFileContext(curDir.toURI());
@@ -152,7 +153,6 @@ public void handle(ContainerLauncherEvent event) {
* ]]
* - runs Task (runSubMap() or runSubReduce())
* - TA can safely send TA_UPDATE since in RUNNING state
- * [modulo possible TA-state-machine race noted below: CHECK (TODO)]
*/
private class SubtaskRunner implements Runnable {
@@ -162,6 +162,7 @@ private class SubtaskRunner implements Runnable {
SubtaskRunner() {
}
+ @SuppressWarnings("unchecked")
@Override
public void run() {
ContainerLauncherEvent event = null;
@@ -183,7 +184,7 @@ public void run() {
ContainerRemoteLaunchEvent launchEv =
(ContainerRemoteLaunchEvent)event;
- TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME: can attemptID ever be null? (only if retrieved over umbilical?)
+ TaskAttemptId attemptID = launchEv.getTaskAttemptID();
Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
int numMapTasks = job.getTotalMaps();
@@ -204,7 +205,6 @@ public void run() {
// port number is set to -1 in this case.
context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(attemptID, -1));
- //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter)
if (numMapTasks == 0) {
doneWithMaps = true;
@@ -259,6 +259,7 @@ public void run() {
}
}
+ @SuppressWarnings("deprecation")
private void runSubtask(org.apache.hadoop.mapred.Task task,
final TaskType taskType,
TaskAttemptId attemptID,
@@ -270,6 +271,19 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
try {
JobConf conf = new JobConf(getConfig());
+ conf.set(JobContext.TASK_ID, task.getTaskID().toString());
+ conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString());
+ conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP));
+ conf.setInt(JobContext.TASK_PARTITION, task.getPartition());
+ conf.set(JobContext.ID, task.getJobID().toString());
+
+ // Use the AM's local dir env to generate the intermediate step
+ // output files
+ String[] localSysDirs = StringUtils.getTrimmedStrings(
+ System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+ conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+ LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+ + conf.get(MRConfig.LOCAL_DIR));
// mark this as an uberized subtask so it can set task counter
// (longer-term/FIXME: could redefine as job counter and send
@@ -285,12 +299,12 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
if (doneWithMaps) {
LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
+ attemptID + "), but should be finished with maps");
- // throw new RuntimeException() (FIXME: what's appropriate here?)
+ throw new RuntimeException();
}
MapTask map = (MapTask)task;
+ map.setConf(conf);
- //CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first?
map.run(conf, umbilical);
if (renameOutputs) {
@@ -305,19 +319,23 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
} else /* TaskType.REDUCE */ {
if (!doneWithMaps) {
- //check if event-queue empty? whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): doesn't send reduce event until maps all done]
+ // check if event-queue empty? whole idea of counting maps vs.
+ // checking event queue is a tad wacky...but could enforce ordering
+ // (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):
+ // doesn't send reduce event until maps all done]
LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+ attemptID + "), but not yet finished with maps");
- // throw new RuntimeException() (FIXME) // or push reduce event back onto end of queue? (probably former)
+ throw new RuntimeException();
}
- ReduceTask reduce = (ReduceTask)task;
-
// a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
// set framework name to local to make task local
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
+ ReduceTask reduce = (ReduceTask)task;
+ reduce.setConf(conf);
+
reduce.run(conf, umbilical);
//relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
}
@@ -334,18 +352,7 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
try {
if (task != null) {
// do cleanup for the task
-// if (childUGI == null) { // no need to job into doAs block
- task.taskCleanup(umbilical);
-// } else {
-// final Task taskFinal = task;
-// childUGI.doAs(new PrivilegedExceptionAction