diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 930444bec8..a7fe03c257 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -61,6 +61,8 @@ Trunk (unreleased changes) HDFS-2395. Add a root element in the JSON responses of webhdfs. (szetszwo) + HDFS-2181 Separate HDFS Client wire protocol data types (sanjay) + BUG FIXES HDFS-2287. TestParallelRead has a small off-by-one bug. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 41fc651074..0e34dae9a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; @@ -252,7 +253,7 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf, this.clientName = leaserenewer.getClientName(dfsClientConf.taskId); this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); if (nameNodeAddr != null && rpcNamenode == null) { - this.namenode = DFSUtil.createNamenode(nameNodeAddr, conf); + this.namenode = DFSUtil.createNamenode(nameNodeAddr, conf, ugi); } else if (nameNodeAddr == null && rpcNamenode != null) { //This case is used for testing. this.namenode = rpcNamenode; @@ -333,12 +334,31 @@ void renewLease() throws IOException { namenode.renewLease(clientName); } } - + + /** + * Close connections the Namenode. + * The namenode variable is either a rpcProxy passed by a test or + * created using the protocolTranslator which is closeable. + * If closeable then call close, else close using RPC.stopProxy(). + */ + void closeConnectionToNamenode() { + if (namenode instanceof Closeable) { + try { + ((Closeable) namenode).close(); + return; + } catch (IOException e) { + // fall through - lets try the stopProxy + LOG.warn("Exception closing namenode, stopping the proxy"); + } + } + RPC.stopProxy(namenode); + } + /** Abort and release resources held. Ignore all errors. */ void abort() { clientRunning = false; closeAllFilesBeingWritten(true); - RPC.stopProxy(namenode); // close connections to the namenode + closeConnectionToNamenode(); } /** Close/abort all files being written. */ @@ -378,7 +398,7 @@ public synchronized void close() throws IOException { clientRunning = false; leaserenewer.closeClient(this); // close connections to the namenode - RPC.stopProxy(namenode); + closeConnectionToNamenode(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 45fb3948e4..593c653b87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -31,33 +31,23 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.StringTokenizer; -import java.util.concurrent.TimeUnit; +import javax.net.SocketFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; @@ -611,80 +601,46 @@ public static int roundBytesToGB(long bytes) { /** Create a {@link NameNode} proxy */ - public static ClientProtocol createNamenode(Configuration conf) throws IOException { + public static ClientProtocol createNamenode(Configuration conf) + throws IOException { return createNamenode(NameNode.getAddress(conf), conf); } /** Create a {@link NameNode} proxy */ public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr, - Configuration conf) throws IOException { - return createNamenode(createRPCNamenode(nameNodeAddr, conf, - UserGroupInformation.getCurrentUser())); - + Configuration conf) throws IOException { + return createNamenode(nameNodeAddr, conf, + UserGroupInformation.getCurrentUser()); } - - /** Create a {@link NameNode} proxy */ - static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr, - Configuration conf, UserGroupInformation ugi) - throws IOException { - return (ClientProtocol)RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, nameNodeAddr, ugi, conf, - NetUtils.getSocketFactory(conf, ClientProtocol.class)); - } - - /** Create a {@link NameNode} proxy */ - static ClientProtocol createNamenode(ClientProtocol rpcNamenode) - throws IOException { - RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( - 5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); - Map,RetryPolicy> remoteExceptionToPolicyMap = - new HashMap, RetryPolicy>(); - remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); - - Map,RetryPolicy> exceptionToPolicyMap = - new HashMap, RetryPolicy>(); - exceptionToPolicyMap.put(RemoteException.class, - RetryPolicies.retryByRemoteException( - RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap)); - RetryPolicy methodPolicy = RetryPolicies.retryByException( - RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); - Map methodNameToPolicyMap = new HashMap(); - - methodNameToPolicyMap.put("create", methodPolicy); - - return (ClientProtocol) RetryProxy.create(ClientProtocol.class, - rpcNamenode, methodNameToPolicyMap); + /** Create a {@link NameNode} proxy */ + public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr, + Configuration conf, UserGroupInformation ugi) throws IOException { + /** + * Currently we have simply burnt-in support for a SINGLE + * protocol - protocolR23Compatible. This will be replaced + * by a way to pick the right protocol based on the + * version of the target server. + */ + return new org.apache.hadoop.hdfs.protocolR23Compatible. + ClientNamenodeProtocolTranslatorR23(nameNodeAddr, conf, ugi); } /** Create a {@link ClientDatanodeProtocol} proxy */ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, - LocatedBlock locatedBlock) - throws IOException { - InetSocketAddress addr = NetUtils.createSocketAddr( - datanodeid.getHost() + ":" + datanodeid.getIpcPort()); - if (ClientDatanodeProtocol.LOG.isDebugEnabled()) { - ClientDatanodeProtocol.LOG.debug("ClientDatanodeProtocol addr=" + addr); - } - - // Since we're creating a new UserGroupInformation here, we know that no - // future RPC proxies will be able to re-use the same connection. And - // usages of this proxy tend to be one-off calls. - // - // This is a temporary fix: callers should really achieve this by using - // RPC.stopProxy() on the resulting object, but this is currently not - // working in trunk. See the discussion on HDFS-1965. - Configuration confWithNoIpcIdle = new Configuration(conf); - confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic - .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); - - UserGroupInformation ticket = UserGroupInformation - .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString()); - ticket.addToken(locatedBlock.getBlockToken()); - return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class, - ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle, - NetUtils.getDefaultSocketFactory(conf), socketTimeout); + LocatedBlock locatedBlock) throws IOException { + return new org.apache.hadoop.hdfs.protocolR23Compatible. + ClientDatanodeProtocolTranslatorR23(datanodeid, conf, socketTimeout, + locatedBlock); + } + + /** Create a {@link ClientDatanodeProtocol} proxy */ + public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory) throws IOException { + return new org.apache.hadoop.hdfs.protocolR23Compatible. + ClientDatanodeProtocolTranslatorR23(addr, ticket, conf, factory); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index a48fd59799..cfd0405237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -39,8 +39,30 @@ public interface ClientDatanodeProtocol extends VersionedProtocol { public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class); + /** + * Until version 9, this class ClientDatanodeProtocol served as both + * the client interface to the DN AND the RPC protocol used to + * communicate with the NN. + * + * Post version 10 (release 23 of Hadoop), the protocol is implemented in + * {@literal ../protocolR23Compatible/ClientDatanodeWireProtocol} + * + * This class is used by both the DFSClient and the + * DN server side to insulate from the protocol serialization. + * + * If you are adding/changing DN's interface then you need to + * change both this class and ALSO + * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol}. + * These changes need to be done in a compatible fashion as described in + * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol} + * + * The log of historical changes can be retrieved from the svn). * 9: Added deleteBlockPool method + * + * 9 is the last version id when this class was used for protocols + * serialization. DO not update this version any further. + * Changes are recorded in R23 classes. */ public static final long versionID = 9L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index e69a2727b4..b9f8bec13d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -64,10 +64,28 @@ public interface ClientProtocol extends VersionedProtocol { /** - * Compared to the previous version the following changes have been introduced: - * (Only the latest change is reflected. + * Until version 69, this class ClientProtocol served as both + * the client interface to the NN AND the RPC protocol used to + * communicate with the NN. + * + * Post version 70 (release 23 of Hadoop), the protocol is implemented in + * {@literal ../protocolR23Compatible/ClientNamenodeWireProtocol} + * + * This class is used by both the DFSClient and the + * NN server side to insulate from the protocol serialization. + * + * If you are adding/changing NN's interface then you need to + * change both this class and ALSO + * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol}. + * These changes need to be done in a compatible fashion as described in + * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol} + * * The log of historical changes can be retrieved from the svn). * 69: Eliminate overloaded method names. + * + * 69L is the last version id when this class was used for protocols + * serialization. DO not update this version any further. + * Changes are recorded in R23 classes. */ public static final long versionID = 69L; @@ -365,11 +383,8 @@ public boolean complete(String src, String clientName, ExtendedBlock last) * @return true if successful, or false if the old name does not exist * or if the new name already belongs to the namespace. * - * @throws IOException an I/O error occurred - * - * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead. + * @throws IOException an I/O error occurred */ - @Deprecated public boolean rename(String src, String dst) throws UnresolvedLinkException, IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index efbfc9b6fe..6bf4481c6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -24,7 +24,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DeprecatedUTF8; import org.apache.hadoop.io.WritableComparable; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index af3283ee71..7c52c1f275 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -75,6 +75,13 @@ public enum AdminStates { public String toString() { return value; } + + public static AdminStates fromValue(final String value) { + for (AdminStates as : AdminStates.values()) { + if (as.value.equals(value)) return as; + } + return NORMAL; + } } @Nullable @@ -110,11 +117,20 @@ public DatanodeInfo(DatanodeID nodeID) { this.adminState = null; } - protected DatanodeInfo(DatanodeID nodeID, String location, String hostName) { + public DatanodeInfo(DatanodeID nodeID, String location, String hostName) { this(nodeID); this.location = location; this.hostName = hostName; } + + public DatanodeInfo(DatanodeID nodeID, String location, String hostName, + final long capacity, final long dfsUsed, final long remaining, + final long blockPoolUsed, final long lastUpdate, final int xceiverCount, + final AdminStates adminState) { + this(nodeID.getName(), nodeID.getStorageID(), nodeID.getInfoPort(), nodeID + .getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed, lastUpdate, + xceiverCount, location, hostName, adminState); + } /** Constructor */ public DatanodeInfo(final String name, final String storageID, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 4a456c94f7..008eb5ad8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -30,7 +30,16 @@ public final class HdfsConstants { /* Hidden constructor */ private HdfsConstants() { } - + + /** + * HDFS Protocol Names: + */ + public static final String CLIENT_NAMENODE_PROTOCOL_NAME = + "org.apache.hadoop.hdfs.protocol.ClientProtocol"; + public static final String CLIENT_DATANODE_PROTOCOL_NAME = + "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol"; + + public static int MIN_BLOCKS_FOR_WRITE = 5; // Long that indicates "leave current quota unchanged" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java index 84d0e4c768..4659dd3352 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java @@ -241,6 +241,10 @@ final public Path getFullPath(final Path parent) { final public String getSymlink() { return DFSUtil.bytes2String(symlink); } + + final public byte[] getSymlinkInBytes() { + return symlink; + } ////////////////////////////////////////////////// // Writable diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 0eace5a84c..797440d60b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -54,6 +54,11 @@ public class LocatedBlock implements Writable { public LocatedBlock() { this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false); } + + + public LocatedBlock(ExtendedBlock eb) { + this(eb, new DatanodeInfo[0], 0L, false); + } public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) { this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolProtocolBuffers/overview.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolProtocolBuffers/overview.html new file mode 100644 index 0000000000..6d41cfdf5a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolProtocolBuffers/overview.html @@ -0,0 +1,29 @@ + + + + + Protocol Buffers based data types for NN protocols + + +

+The Protocol Buffers data types for NN protocols that use +PB go in this package. +

+ + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java new file mode 100644 index 0000000000..05297bad22 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +/** + * This class is used on the server side. + * Calls come across the wire for the protocol family of Release 23 onwards. + * This class translates the R23 data types to the internal data types used + * inside the DN as specified in the generic ClientDatanodeProtocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientDatanodeProtocolServerSideTranslatorR23 implements + ClientDatanodeWireProtocol { + + final private ClientDatanodeProtocol server; + + /** + * + * @param server - the NN server + * @throws IOException + */ + public ClientDatanodeProtocolServerSideTranslatorR23( + ClientDatanodeProtocol server) throws IOException { + this.server = server; + } + + /** + * the client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call + * getProtocolVersion and possibly in the future getProtocolSignature. + * Hence we still implement it even though the end client's call will + * never reach here. + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and + * signature is that of {@link ClientDatanodeProtocol} + */ + if (!protocol.equals(RPC.getProtocolName( + ClientDatanodeWireProtocol.class))) { + throw new IOException("Datanode Serverside implements " + + ClientDatanodeWireProtocol.class + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + ClientDatanodeWireProtocol.versionID, + ClientDatanodeWireProtocol.class); + } + + @Override + public ProtocolSignatureWritable + getProtocolSignature2( + String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + /** + * Don't forward this to the server. The protocol version and + * signature is that of {@link ClientNamenodeProtocol} + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + if (protocol.equals(RPC.getProtocolName( + ClientDatanodeWireProtocol.class))) { + return ClientDatanodeWireProtocol.versionID; + } + throw new IOException("Datanode Serverside implements " + + ClientDatanodeWireProtocol.class + + ". The following requested protocol is unknown: " + protocol); + } + + @Override + public long getReplicaVisibleLength(ExtendedBlockWritable b) throws IOException { + return + server.getReplicaVisibleLength(ExtendedBlockWritable.convertExtendedBlock(b)); + } + + @Override + public void refreshNamenodes() throws IOException { + server.refreshNamenodes(); + } + + @Override + public void deleteBlockPool(String bpid, boolean force) throws IOException { + server.deleteBlockPool(bpid, force); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java new file mode 100644 index 0000000000..9e384dd95c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import javax.net.SocketFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + + +/** + * This class forwards ClientDatanodeProtocol calls as RPC to the DN server + * while translating from the parameter types used in ClientDatanodeProtocol to + * those used in protocolR23Compatile.*. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientDatanodeProtocolTranslatorR23 implements + ClientDatanodeProtocol { + + final private ClientDatanodeWireProtocol rpcProxy; + + public ClientDatanodeProtocolTranslatorR23(DatanodeID datanodeid, + Configuration conf, int socketTimeout, LocatedBlock locatedBlock) + throws IOException { + rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, + socketTimeout, locatedBlock); + } + + /** used for testing */ + public ClientDatanodeProtocolTranslatorR23(InetSocketAddress addr, + UserGroupInformation ticket, + Configuration conf, + SocketFactory factory) throws IOException { + rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory); + } + + static ClientDatanodeWireProtocol createClientDatanodeProtocolProxy( + DatanodeID datanodeid, Configuration conf, int socketTimeout, + LocatedBlock locatedBlock) + throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr( + datanodeid.getHost() + ":" + datanodeid.getIpcPort()); + if (ClientDatanodeWireProtocol.LOG.isDebugEnabled()) { + ClientDatanodeWireProtocol.LOG.debug( + "ClientDatanodeProtocol addr=" + addr); + } + + // Since we're creating a new UserGroupInformation here, we know that no + // future RPC proxies will be able to re-use the same connection. And + // usages of this proxy tend to be one-off calls. + // + // This is a temporary fix: callers should really achieve this by using + // RPC.stopProxy() on the resulting object, but this is currently not + // working in trunk. See the discussion on HDFS-1965. + Configuration confWithNoIpcIdle = new Configuration(conf); + confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic + .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); + + UserGroupInformation ticket = UserGroupInformation + .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString()); + ticket.addToken(locatedBlock.getBlockToken()); + return RPC.getProxy(ClientDatanodeWireProtocol.class, + ClientDatanodeWireProtocol.versionID, addr, ticket, confWithNoIpcIdle, + NetUtils.getDefaultSocketFactory(conf), socketTimeout); + } + + static ClientDatanodeWireProtocol createClientDatanodeProtocolProxy( + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory) throws IOException { + return RPC.getProxy(ClientDatanodeWireProtocol.class, + ClientDatanodeWireProtocol.versionID, addr, ticket, conf, + factory); + } + + @Override + public ProtocolSignature getProtocolSignature( + String protocolName, long clientVersion, int clientMethodHash) + throws IOException { + return ProtocolSignatureWritable.convert( + rpcProxy.getProtocolSignature2( + protocolName, clientVersion, clientMethodHash)); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return rpcProxy.getProtocolVersion(protocolName, clientVersion); + } + + @Override + public long getReplicaVisibleLength(ExtendedBlock b) throws IOException { + return rpcProxy.getReplicaVisibleLength( + ExtendedBlockWritable.convertExtendedBlock(b)); + } + + @Override + public void refreshNamenodes() throws IOException { + rpcProxy.refreshNamenodes(); + + } + + @Override + public void deleteBlockPool(String bpid, boolean force) throws IOException { + rpcProxy.deleteBlockPool(bpid, force); + + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java new file mode 100644 index 0000000000..551d21007c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.TokenInfo; + +/** + * This class defines the actual protocol used to communicate with the + * DN via RPC using writable types. + * The parameters in the methods which are specified in the + * package are separate from those used internally in the DN and DFSClient + * and hence need to be converted using {@link ClientDatanodeProtocolTranslatorR23} + * and {@link ClientDatanodeProtocolServerSideTranslatorR23}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) +@TokenInfo(BlockTokenSelector.class) +@ProtocolInfo(protocolName = HdfsConstants.CLIENT_DATANODE_PROTOCOL_NAME) +public interface ClientDatanodeWireProtocol extends VersionedProtocol { + public static final Log LOG = + LogFactory.getLog(ClientDatanodeWireProtocol.class); + + /** + * The rules for changing this protocol are the same as that for + * {@link ClientNamenodeWireProtocol} - see that java file for details. + * 9: Added deleteBlockPool method + * 10 Moved the R23 protocol + */ + public static final long versionID = 10L; + + /** + * The specification of this method matches that of + * + * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol + * #getReplicaVisibleLength(org.apache.hadoop.hdfs.protocol.ExtendedBlock)} + */ + long getReplicaVisibleLength(ExtendedBlockWritable b) throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#refreshNamenodes()} + */ + void refreshNamenodes() throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#deleteBlockPool(String, boolean)} + */ + void deleteBlockPool(String bpid, boolean force) throws IOException; + + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 to the method name + * to avoid conflict. + */ + public org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable + getProtocolSignature2(String protocol, + long clientVersion, + int clientMethodsHash) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolServerSideTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolServerSideTranslatorR23.java new file mode 100644 index 0000000000..e9c053943e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolServerSideTranslatorR23.java @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.token.Token; + +/** + * This class is used on the server side. Calls come across the wire for the + * protocol family of Release 23 onwards. This class translates the R23 data + * types to the native data types used inside the NN as specified in the generic + * ClientProtocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientNamenodeProtocolServerSideTranslatorR23 implements + ClientNamenodeWireProtocol { + final private ClientProtocol server; + + /** + * Constructor + * + * @param server - the NN server + * @throws IOException + */ + public ClientNamenodeProtocolServerSideTranslatorR23(ClientProtocol server) + throws IOException { + this.server = server; + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client's call will never reach here. + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientNamenodeProtocol} + * + */ + if (!protocol.equals(RPC.getProtocolName( + ClientNamenodeWireProtocol.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(ClientNamenodeWireProtocol.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + ClientNamenodeWireProtocol.versionID, + ClientNamenodeWireProtocol.class); + } + + @Override + public ProtocolSignatureWritable + getProtocolSignature2( + String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientNamenodeProtocol} + * + */ + + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (protocol.equals(RPC.getProtocolName( + ClientNamenodeWireProtocol.class))) { + return ClientNamenodeWireProtocol.versionID; + } + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(ClientNamenodeWireProtocol.class) + + ". The following requested protocol is unknown: " + protocol); + } + + @Override + public LocatedBlocksWritable getBlockLocations( + String src, long offset, long length) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + return LocatedBlocksWritable.convertLocatedBlocks( + server.getBlockLocations(src, offset, length)); + } + + @Override + public FsServerDefaultsWritable getServerDefaults() throws IOException { + return FsServerDefaultsWritable.convert(server.getServerDefaults()); + } + + @Override + public void create(String src, FsPermissionWritable masked, String clientName, + EnumSetWritable flag, boolean createParent, + short replication, long blockSize) throws AccessControlException, + AlreadyBeingCreatedException, DSQuotaExceededException, + FileAlreadyExistsException, FileNotFoundException, + NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, + UnresolvedLinkException, IOException { + server.create(src, FsPermissionWritable.convertPermission(masked), + clientName, flag, createParent, replication, blockSize); + + } + + @Override + public LocatedBlockWritable append(String src, String clientName) + throws AccessControlException, DSQuotaExceededException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + return LocatedBlockWritable.convertLocatedBlock( + server.append(src, clientName)); + } + + @Override + public boolean setReplication(String src, short replication) + throws AccessControlException, DSQuotaExceededException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + return server.setReplication(src, replication); + } + + @Override + public void setPermission(String src, FsPermissionWritable permission) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + server.setPermission(src, + FsPermissionWritable.convertPermission(permission)); + + } + + @Override + public void setOwner(String src, String username, String groupname) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + server.setOwner(src, username, groupname); + + } + + @Override + public void abandonBlock(ExtendedBlockWritable b, String src, String holder) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + server.abandonBlock( + ExtendedBlockWritable.convertExtendedBlock(b), src, holder); + + } + + @Override + public LocatedBlockWritable addBlock(String src, String clientName, + ExtendedBlockWritable previous, DatanodeInfoWritable[] excludeNodes) + throws AccessControlException, FileNotFoundException, + NotReplicatedYetException, SafeModeException, UnresolvedLinkException, + IOException { + return LocatedBlockWritable.convertLocatedBlock( + server.addBlock(src, clientName, + ExtendedBlockWritable.convertExtendedBlock(previous), + DatanodeInfoWritable.convertDatanodeInfo(excludeNodes))); + } + + @Override + public LocatedBlockWritable getAdditionalDatanode(String src, ExtendedBlockWritable blk, + DatanodeInfoWritable[] existings, DatanodeInfoWritable[] excludes, + int numAdditionalNodes, String clientName) throws AccessControlException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + return LocatedBlockWritable.convertLocatedBlock( + server.getAdditionalDatanode(src, + ExtendedBlockWritable.convertExtendedBlock(blk), + DatanodeInfoWritable.convertDatanodeInfo(existings), + DatanodeInfoWritable.convertDatanodeInfo(excludes), + numAdditionalNodes, clientName)); + } + + @Override + public boolean complete(String src, String clientName, ExtendedBlockWritable last) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + return server.complete(src, clientName, + ExtendedBlockWritable.convertExtendedBlock(last)); + } + + @Override + public void reportBadBlocks(LocatedBlockWritable[] blocks) throws IOException { + server.reportBadBlocks(LocatedBlockWritable.convertLocatedBlock(blocks)); + + } + + @Override + public boolean rename(String src, String dst) throws UnresolvedLinkException, + IOException { + return server.rename(src, dst); + } + + @Override + public void concat(String trg, String[] srcs) throws IOException, + UnresolvedLinkException { + server.concat(trg, srcs); + + } + + @Override + public void rename2(String src, String dst, Rename... options) + throws AccessControlException, DSQuotaExceededException, + FileAlreadyExistsException, FileNotFoundException, + NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, + UnresolvedLinkException, IOException { + server.rename2(src, dst, options); + } + + @Override + public boolean delete(String src, boolean recursive) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + return server.delete(src, recursive); + } + + @Override + public boolean mkdirs(String src, FsPermissionWritable masked, boolean createParent) + throws AccessControlException, FileAlreadyExistsException, + FileNotFoundException, NSQuotaExceededException, + ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, + IOException { + + return server.mkdirs(src, FsPermissionWritable.convertPermission(masked), + createParent); + } + + @Override + public DirectoryListingWritable getListing(String src, byte[] startAfter, + boolean needLocation) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + return DirectoryListingWritable.convertDirectoryListing( + server.getListing(src, startAfter, needLocation)); + } + + @Override + public void renewLease(String clientName) throws AccessControlException, + IOException { + server.renewLease(clientName); + + } + + @Override + public boolean recoverLease(String src, String clientName) throws IOException { + return server.recoverLease(src, clientName); + } + + @Override + public long[] getStats() throws IOException { + return server.getStats(); + } + + @Override + public DatanodeInfoWritable[] getDatanodeReport(DatanodeReportType type) + throws IOException { + return DatanodeInfoWritable + .convertDatanodeInfo(server.getDatanodeReport(type)); + } + + @Override + public long getPreferredBlockSize(String filename) throws IOException, + UnresolvedLinkException { + return server.getPreferredBlockSize(filename); + } + + @Override + public boolean setSafeMode(SafeModeAction action) throws IOException { + return server.setSafeMode(action); + } + + @Override + public void saveNamespace() throws AccessControlException, IOException { + server.saveNamespace(); + + } + + @Override + public boolean restoreFailedStorage(String arg) throws AccessControlException { + return server.restoreFailedStorage(arg); + } + + @Override + public void refreshNodes() throws IOException { + server.refreshNodes(); + + } + + @Override + public void finalizeUpgrade() throws IOException { + server.finalizeUpgrade(); + + } + + @Override + public UpgradeStatusReportWritable distributedUpgradeProgress(UpgradeAction action) + throws IOException { + return UpgradeStatusReportWritable.convert( + server.distributedUpgradeProgress(action)); + } + + @Override + public CorruptFileBlocksWritable listCorruptFileBlocks(String path, String cookie) + throws IOException { + return CorruptFileBlocksWritable.convertCorruptFilesBlocks( + server.listCorruptFileBlocks(path, cookie)); + } + + @Override + public void metaSave(String filename) throws IOException { + server.metaSave(filename); + + } + + @Override + public HdfsFileStatusWritable getFileInfo(String src) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + return HdfsFileStatusWritable.convertHdfsFileStatus( + server.getFileInfo(src)); + } + + @Override + public HdfsFileStatusWritable getFileLinkInfo(String src) + throws AccessControlException, UnresolvedLinkException, IOException { + return HdfsFileStatusWritable.convertHdfsFileStatus( + server.getFileLinkInfo(src)); + } + + @Override + public ContentSummaryWritable getContentSummary(String path) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + return ContentSummaryWritable.convert(server.getContentSummary(path)); + } + + @Override + public void setQuota(String path, long namespaceQuota, long diskspaceQuota) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + server.setQuota(path, namespaceQuota, diskspaceQuota); + + } + + @Override + public void fsync(String src, String client) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + server.fsync(src, client); + + } + + @Override + public void setTimes(String src, long mtime, long atime) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + server.setTimes(src, mtime, atime); + + } + + @Override + public void createSymlink(String target, String link, FsPermissionWritable dirPerm, + boolean createParent) throws AccessControlException, + FileAlreadyExistsException, FileNotFoundException, + ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, + IOException { + server.createSymlink(target, link, FsPermissionWritable.convertPermission(dirPerm), + createParent); + + } + + @Override + public String getLinkTarget(String path) throws AccessControlException, + FileNotFoundException, IOException { + return server.getLinkTarget(path); + } + + @Override + public LocatedBlockWritable updateBlockForPipeline(ExtendedBlockWritable block, + String clientName) throws IOException { + return LocatedBlockWritable.convertLocatedBlock( + server.updateBlockForPipeline( + ExtendedBlockWritable.convertExtendedBlock(block), clientName)); + } + + @Override + public void updatePipeline(String clientName, ExtendedBlockWritable oldBlock, + ExtendedBlockWritable newBlock, DatanodeIDWritable[] newNodes) + throws IOException { + server.updatePipeline(clientName, + ExtendedBlockWritable.convertExtendedBlock(oldBlock), + ExtendedBlockWritable.convertExtendedBlock(newBlock), + DatanodeIDWritable.convertDatanodeID(newNodes)); + } + + @Override + public Token getDelegationToken(Text renewer) + throws IOException { + return server.getDelegationToken(renewer); + } + + @Override + public long renewDelegationToken(Token token) + throws IOException { + return server.renewDelegationToken(token); + } + + @Override + public void cancelDelegationToken(Token token) + throws IOException { + server.cancelDelegationToken(token); + } + + @Override + public void setBalancerBandwidth(long bandwidth) throws IOException { + server.setBalancerBandwidth(bandwidth); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolTranslatorR23.java new file mode 100644 index 0000000000..aa1da0a6d4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolTranslatorR23.java @@ -0,0 +1,485 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +/** + * This class forwards NN's ClientProtocol calls as RPC calls to the NN server + * while translating from the parameter types used in ClientProtocol to those + * used in protocolR23Compatile.*. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientNamenodeProtocolTranslatorR23 implements + ClientProtocol, Closeable { + final private ClientNamenodeWireProtocol rpcProxyWithoutRetry; + final private ClientNamenodeWireProtocol rpcProxy; + + private static ClientNamenodeWireProtocol createNamenode( + InetSocketAddress nameNodeAddr, Configuration conf, + UserGroupInformation ugi) throws IOException { + return RPC.getProxy(ClientNamenodeWireProtocol.class, + ClientNamenodeWireProtocol.versionID, nameNodeAddr, ugi, conf, + NetUtils.getSocketFactory(conf, ClientNamenodeWireProtocol.class)); + } + + /** Create a {@link NameNode} proxy */ + static ClientNamenodeWireProtocol createNamenodeWithRetry( + ClientNamenodeWireProtocol rpcNamenode) { + RetryPolicy createPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(5, + HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); + + Map, RetryPolicy> remoteExceptionToPolicyMap = new HashMap, RetryPolicy>(); + remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, + createPolicy); + + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(RemoteException.class, RetryPolicies + .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, + remoteExceptionToPolicyMap)); + RetryPolicy methodPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map methodNameToPolicyMap = new HashMap(); + + methodNameToPolicyMap.put("create", methodPolicy); + + return (ClientNamenodeWireProtocol) RetryProxy.create( + ClientNamenodeWireProtocol.class, rpcNamenode, methodNameToPolicyMap); + } + + public ClientNamenodeProtocolTranslatorR23(InetSocketAddress nameNodeAddr, + Configuration conf, UserGroupInformation ugi) throws IOException { + rpcProxyWithoutRetry = createNamenode(nameNodeAddr, conf, ugi); + rpcProxy = createNamenodeWithRetry(rpcProxyWithoutRetry); + } + + public Object getProxyWithoutRetry() { + return rpcProxyWithoutRetry; + } + + public void close() { + RPC.stopProxy(rpcProxyWithoutRetry); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocolName, + long clientVersion, int clientMethodHash) + throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocolName, clientVersion, clientMethodHash)); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) throws IOException { + return rpcProxy.getProtocolVersion(protocolName, clientVersion); + } + + @Override + public LocatedBlocks getBlockLocations(String src, long offset, long length) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + return LocatedBlocksWritable + .convertLocatedBlocks(rpcProxy.getBlockLocations(src, offset, length)); + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + return FsServerDefaultsWritable + .convert(rpcProxy.getServerDefaults()); + } + + @Override + public void create(String src, FsPermission masked, String clientName, + EnumSetWritable flag, boolean createParent, + short replication, long blockSize) throws AccessControlException, + AlreadyBeingCreatedException, DSQuotaExceededException, + FileAlreadyExistsException, FileNotFoundException, + NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, + UnresolvedLinkException, IOException { + rpcProxy.create(src, FsPermissionWritable.convertPermission(masked), + clientName, flag, createParent, replication, blockSize); + + } + + @Override + public LocatedBlock append(String src, String clientName) + throws AccessControlException, DSQuotaExceededException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + return LocatedBlockWritable + .convertLocatedBlock(rpcProxy.append(src, clientName)); + } + + @Override + public boolean setReplication(String src, short replication) + throws AccessControlException, DSQuotaExceededException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + return rpcProxy.setReplication(src, replication); + } + + @Override + public void setPermission(String src, FsPermission permission) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + rpcProxy.setPermission(src, + FsPermissionWritable.convertPermission(permission)); + + } + + @Override + public void setOwner(String src, String username, String groupname) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + rpcProxy.setOwner(src, username, groupname); + + } + + @Override + public void abandonBlock(ExtendedBlock b, String src, String holder) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + rpcProxy.abandonBlock( + ExtendedBlockWritable.convertExtendedBlock(b), src, holder); + + } + + @Override + public LocatedBlock addBlock(String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes) + throws AccessControlException, FileNotFoundException, + NotReplicatedYetException, SafeModeException, UnresolvedLinkException, + IOException { + return LocatedBlockWritable + .convertLocatedBlock(rpcProxy.addBlock(src, clientName, + ExtendedBlockWritable.convertExtendedBlock(previous), + DatanodeInfoWritable.convertDatanodeInfo(excludeNodes))); + } + + @Override + public LocatedBlock getAdditionalDatanode(String src, ExtendedBlock blk, + DatanodeInfo[] existings, DatanodeInfo[] excludes, + int numAdditionalNodes, String clientName) throws AccessControlException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + return LocatedBlockWritable + .convertLocatedBlock(rpcProxy.getAdditionalDatanode(src, + ExtendedBlockWritable.convertExtendedBlock(blk), + DatanodeInfoWritable.convertDatanodeInfo(existings), + DatanodeInfoWritable.convertDatanodeInfo(excludes), + numAdditionalNodes, clientName)); + } + + @Override + public boolean complete(String src, String clientName, ExtendedBlock last) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + return rpcProxy.complete(src, clientName, + ExtendedBlockWritable.convertExtendedBlock(last)); + } + + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + rpcProxy.reportBadBlocks(LocatedBlockWritable.convertLocatedBlock(blocks)); + + } + + @Override + public boolean rename(String src, String dst) throws UnresolvedLinkException, + IOException { + return rpcProxy.rename(src, dst); + } + + @Override + public void concat(String trg, String[] srcs) throws IOException, + UnresolvedLinkException { + rpcProxy.concat(trg, srcs); + + } + + @Override + public void rename2(String src, String dst, Rename... options) + throws AccessControlException, DSQuotaExceededException, + FileAlreadyExistsException, FileNotFoundException, + NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, + UnresolvedLinkException, IOException { + rpcProxy.rename2(src, dst, options); + + } + + @Override + public boolean delete(String src, boolean recursive) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + return rpcProxy.delete(src, recursive); + } + + @Override + public boolean mkdirs(String src, FsPermission masked, boolean createParent) + throws AccessControlException, FileAlreadyExistsException, + FileNotFoundException, NSQuotaExceededException, + ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, + IOException { + + return rpcProxy.mkdirs(src, + FsPermissionWritable.convertPermission(masked), createParent); + } + + @Override + public DirectoryListing getListing(String src, byte[] startAfter, + boolean needLocation) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + return DirectoryListingWritable.convertDirectoryListing( + rpcProxy.getListing(src, startAfter, needLocation)); + } + + @Override + public void renewLease(String clientName) throws AccessControlException, + IOException { + rpcProxy.renewLease(clientName); + + } + + @Override + public boolean recoverLease(String src, String clientName) throws IOException { + return rpcProxy.recoverLease(src, clientName); + } + + @Override + public long[] getStats() throws IOException { + return rpcProxy.getStats(); + } + + @Override + public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) + throws IOException { + return DatanodeInfoWritable.convertDatanodeInfo( + rpcProxy.getDatanodeReport(type)); + } + + @Override + public long getPreferredBlockSize(String filename) throws IOException, + UnresolvedLinkException { + return rpcProxy.getPreferredBlockSize(filename); + } + + @Override + public boolean setSafeMode(SafeModeAction action) throws IOException { + return rpcProxy.setSafeMode(action); + } + + @Override + public void saveNamespace() throws AccessControlException, IOException { + rpcProxy.saveNamespace(); + + } + + @Override + public boolean restoreFailedStorage(String arg) throws AccessControlException { + return rpcProxy.restoreFailedStorage(arg); + } + + @Override + public void refreshNodes() throws IOException { + rpcProxy.refreshNodes(); + + } + + @Override + public void finalizeUpgrade() throws IOException { + rpcProxy.finalizeUpgrade(); + + } + + @Override + public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) + throws IOException { + return UpgradeStatusReportWritable.convert( + rpcProxy.distributedUpgradeProgress(action)); + } + + @Override + public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) + throws IOException { + return CorruptFileBlocksWritable.convertCorruptFileBlocks( + rpcProxy.listCorruptFileBlocks(path, cookie)); + } + + @Override + public void metaSave(String filename) throws IOException { + rpcProxy.metaSave(filename); + + } + + @Override + public HdfsFileStatus getFileInfo(String src) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + return HdfsFileStatusWritable.convertHdfsFileStatus( + rpcProxy.getFileInfo(src)); + } + + @Override + public HdfsFileStatus getFileLinkInfo(String src) + throws AccessControlException, UnresolvedLinkException, IOException { + return HdfsFileStatusWritable + .convertHdfsFileStatus(rpcProxy.getFileLinkInfo(src)); + } + + @Override + public ContentSummary getContentSummary(String path) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + return ContentSummaryWritable + .convert(rpcProxy.getContentSummary(path)); + } + + @Override + public void setQuota(String path, long namespaceQuota, long diskspaceQuota) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + rpcProxy.setQuota(path, namespaceQuota, diskspaceQuota); + + } + + @Override + public void fsync(String src, String client) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + rpcProxy.fsync(src, client); + + } + + @Override + public void setTimes(String src, long mtime, long atime) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + rpcProxy.setTimes(src, mtime, atime); + + } + + @Override + public void createSymlink(String target, String link, FsPermission dirPerm, + boolean createParent) throws AccessControlException, + FileAlreadyExistsException, FileNotFoundException, + ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, + IOException { + rpcProxy.createSymlink(target, link, + FsPermissionWritable.convertPermission(dirPerm), createParent); + + } + + @Override + public String getLinkTarget(String path) throws AccessControlException, + FileNotFoundException, IOException { + return rpcProxy.getLinkTarget(path); + } + + @Override + public LocatedBlock updateBlockForPipeline(ExtendedBlock block, + String clientName) throws IOException { + return LocatedBlockWritable.convertLocatedBlock( + rpcProxy.updateBlockForPipeline( + ExtendedBlockWritable.convertExtendedBlock(block), clientName)); + } + + @Override + public void updatePipeline(String clientName, ExtendedBlock oldBlock, + ExtendedBlock newBlock, DatanodeID[] newNodes) throws IOException { + rpcProxy.updatePipeline(clientName, + ExtendedBlockWritable.convertExtendedBlock(oldBlock), + ExtendedBlockWritable.convertExtendedBlock(newBlock), + DatanodeIDWritable.convertDatanodeID(newNodes)); + + } + + @Override + public Token getDelegationToken(Text renewer) + throws IOException { + return rpcProxy.getDelegationToken(renewer); + } + + @Override + public long renewDelegationToken(Token token) + throws IOException { + return rpcProxy.renewDelegationToken(token); + } + + @Override + public void cancelDelegationToken(Token token) + throws IOException { + rpcProxy.cancelDelegationToken(token); + } + + @Override + public void setBalancerBandwidth(long bandwidth) throws IOException { + rpcProxy.setBalancerBandwidth(bandwidth); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeWireProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeWireProtocol.java new file mode 100644 index 0000000000..c2199e6425 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeWireProtocol.java @@ -0,0 +1,482 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.avro.reflect.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.ipc.ProtocolInfo; + +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; + +/********************************************************************** + * This class defines the actual protocol used to communicate with the + * NN via RPC using writable types. + * The parameters in the methods which are specified in the + * package are separate from those used internally in the NN and DFSClient + * and hence need to be converted using {@link ClientNamenodeProtocolTranslatorR23} + * and {@link ClientNamenodeProtocolServerSideTranslatorR23}. + * + **********************************************************************/ +@InterfaceAudience.Private +@InterfaceStability.Stable +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY) +@TokenInfo(DelegationTokenSelector.class) +@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME) +public interface ClientNamenodeWireProtocol extends VersionedProtocol { + + /** + * Changes to the protocol: + * + * Do NOT change a method's signature (ie name, parameters, parameter types + * or exceptions thrown). If you need to make changes then ADD new methods and + * new data types. + * Hence if you maintain compatibility you will NOT have to change + * the version number below. The version number is changed ONLY + * if you break compatibility (which is a big deal). + * Hence the version number is really a Major Version Number. + * + * The log of historical changes prior to 69 can be retrieved from the svn. + * ALL changes since version 69L are recorded. + * Version number is changed ONLY for Incompatible changes. + * (note previously we used to change version number for both + * compatible and incompatible changes). + * 69: Eliminate overloaded method names. (Compatible) + * 70: Separation of Datatypes - the client namenode protocol is implemented + * in this class instead of in + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} + * as was done prior to version 70. + */ + public static final long versionID = 70L; + + /////////////////////////////////////// + // File contents + /////////////////////////////////////// + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getBlockLocations} + */ + @Nullable + public LocatedBlocksWritable getBlockLocations(String src, + long offset, + long length) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getServerDefaults()} + */ + public FsServerDefaultsWritable getServerDefaults() throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#create(String, + * org.apache.hadoop.fs.permission.FsPermission, String, + * EnumSetWritable, boolean, short, long)} + */ + public void create(String src, FsPermissionWritable masked, String clientName, + EnumSetWritable flag, boolean createParent, + short replication, long blockSize) throws AccessControlException, + AlreadyBeingCreatedException, DSQuotaExceededException, + FileAlreadyExistsException, FileNotFoundException, + NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#append(String, String)} + */ + public LocatedBlockWritable append(String src, String clientName) + throws AccessControlException, DSQuotaExceededException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setReplication(String, short)} + */ + public boolean setReplication(String src, short replication) + throws AccessControlException, DSQuotaExceededException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setPermission(String, + * org.apache.hadoop.fs.permission.FsPermission)} + */ + public void setPermission(String src, FsPermissionWritable permission) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setOwner(String, String, String)} + */ + public void setOwner(String src, String username, String groupname) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#abandonBlock( + * org.apache.hadoop.hdfs.protocol.ExtendedBlock, String, String)} + */ + public void abandonBlock(ExtendedBlockWritable b, String src, String holder) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#addBlock(String, + * String, org.apache.hadoop.hdfs.protocol.ExtendedBlock, + * org.apache.hadoop.hdfs.protocol.DatanodeInfo[])} + */ + public LocatedBlockWritable addBlock(String src, String clientName, + @Nullable ExtendedBlockWritable previous, @Nullable DatanodeInfoWritable[] excludeNodes) + throws AccessControlException, FileNotFoundException, + NotReplicatedYetException, SafeModeException, UnresolvedLinkException, + IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getAdditionalDatanode} + */ + public LocatedBlockWritable getAdditionalDatanode( + final String src, final ExtendedBlockWritable blk, + final DatanodeInfoWritable[] existings, + final DatanodeInfoWritable[] excludes, + final int numAdditionalNodes, final String clientName + ) throws AccessControlException, FileNotFoundException, + SafeModeException, UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#complete} + */ + public boolean complete( + String src, String clientName, ExtendedBlockWritable last) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#reportBadBlocks} + */ + public void reportBadBlocks(LocatedBlockWritable[] blocks) throws IOException; + + /////////////////////////////////////// + // Namespace management + /////////////////////////////////////// + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#rename(String, String)} + */ + public boolean rename(String src, String dst) + throws UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#concat(String, String[])} + */ + public void concat(String trg, String[] srcs) + throws IOException, UnresolvedLinkException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#rename2} + */ + public void rename2(String src, String dst, Options.Rename... options) + throws AccessControlException, DSQuotaExceededException, + FileAlreadyExistsException, FileNotFoundException, + NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#delete(String, boolean)} + */ + public boolean delete(String src, boolean recursive) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#mkdirs} + */ + public boolean mkdirs( + String src, FsPermissionWritable masked, boolean createParent) + throws AccessControlException, FileAlreadyExistsException, + FileNotFoundException, NSQuotaExceededException, + ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, + IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getListing} + */ + public DirectoryListingWritable getListing(String src, + byte[] startAfter, + boolean needLocation) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException; + + /////////////////////////////////////// + // System issues and management + /////////////////////////////////////// + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#renewLease(String)} + */ + public void renewLease(String clientName) throws AccessControlException, + IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#recoverLease(String, String)} + */ + public boolean recoverLease(String src, String clientName) throws IOException; + + public int GET_STATS_CAPACITY_IDX = 0; + public int GET_STATS_USED_IDX = 1; + public int GET_STATS_REMAINING_IDX = 2; + public int GET_STATS_UNDER_REPLICATED_IDX = 3; + public int GET_STATS_CORRUPT_BLOCKS_IDX = 4; + public int GET_STATS_MISSING_BLOCKS_IDX = 5; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getStats()} + */ + public long[] getStats() throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getDatanodeReport} + */ + public DatanodeInfoWritable[] getDatanodeReport( + HdfsConstants.DatanodeReportType type) + throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getPreferredBlockSize} + */ + public long getPreferredBlockSize(String filename) + throws IOException, UnresolvedLinkException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction)} + */ + public boolean setSafeMode(HdfsConstants.SafeModeAction action) + throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#saveNamespace()} + */ + public void saveNamespace() throws AccessControlException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String)} + */ + public boolean restoreFailedStorage(String arg) throws AccessControlException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#refreshNodes()} + */ + public void refreshNodes() throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#finalizeUpgrade()} + */ + public void finalizeUpgrade() throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#distributedUpgradeProgress} + */ + @Nullable + public UpgradeStatusReportWritable distributedUpgradeProgress( + UpgradeAction action) + throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#listCorruptFileBlocks(String, String)} + */ + public CorruptFileBlocksWritable + listCorruptFileBlocks(String path, String cookie) + throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#metaSave(String)} + */ + public void metaSave(String filename) throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setBalancerBandwidth(long)} + */ + public void setBalancerBandwidth(long bandwidth) throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getFileInfo(String)} + */ + @Nullable + public HdfsFileStatusWritable getFileInfo(String src) + throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getFileLinkInfo(String)} + */ + public HdfsFileStatusWritable getFileLinkInfo(String src) + throws AccessControlException, UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getContentSummary(String)} + */ + public ContentSummaryWritable getContentSummary(String path) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long)} + */ + public void setQuota(String path, long namespaceQuota, long diskspaceQuota) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#fsync(String, String)} + */ + public void fsync(String src, String client) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setTimes(String, long, long)} + */ + public void setTimes(String src, long mtime, long atime) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#createSymlink} + */ + public void createSymlink( + String target, String link, FsPermissionWritable dirPerm, + boolean createParent) throws AccessControlException, + FileAlreadyExistsException, FileNotFoundException, + ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, + IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getLinkTarget(String)} + */ + public String getLinkTarget(String path) throws AccessControlException, + FileNotFoundException, IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#updateBlockForPipeline} + */ + public LocatedBlockWritable updateBlockForPipeline( + ExtendedBlockWritable block, String clientName) throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#updatePipeline} + */ + public void updatePipeline(String clientName, ExtendedBlockWritable oldBlock, + ExtendedBlockWritable newBlock, DatanodeIDWritable[] newNodes) + throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getDelegationToken(Text)} + */ + public Token getDelegationToken(Text renewer) + throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#renewDelegationToken(Token)} + */ + public long renewDelegationToken(Token token) + throws IOException; + + /** + * The specification of this method matches that of + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#cancelDelegationToken(Token)} + */ + public void cancelDelegationToken(Token token) + throws IOException; + + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 the method name + * to avoid conflict. + */ + public org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable + getProtocolSignature2(String protocol, + long clientVersion, + int clientMethodsHash) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ContentSummaryWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ContentSummaryWritable.java new file mode 100644 index 0000000000..95bb0c552a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ContentSummaryWritable.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; + +/** Store the summary of a content (a directory or a file). */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ContentSummaryWritable implements Writable{ + private long length; + private long fileCount; + private long directoryCount; + private long quota; + private long spaceConsumed; + private long spaceQuota; + + + public static org.apache.hadoop.fs.ContentSummary convert(ContentSummaryWritable cs) { + if (cs == null) return null; + return new org.apache.hadoop.fs.ContentSummary( + cs.getLength(), cs.getFileCount(), cs.getDirectoryCount(), cs.getQuota(), + cs.getSpaceConsumed(), cs.getSpaceQuota()); + } + + public static ContentSummaryWritable convert(org.apache.hadoop.fs.ContentSummary cs) { + if (cs == null) return null; + return new ContentSummaryWritable( + cs.getLength(), cs.getFileCount(), cs.getDirectoryCount(), cs.getQuota(), + cs.getSpaceConsumed(), cs.getSpaceQuota()); + } + + /** Constructor */ + public ContentSummaryWritable() {} + + /** Constructor */ + public ContentSummaryWritable(long length, long fileCount, long directoryCount) { + this(length, fileCount, directoryCount, -1L, length, -1L); + } + + /** Constructor */ + public ContentSummaryWritable( + long length, long fileCount, long directoryCount, long quota, + long spaceConsumed, long spaceQuota) { + this.length = length; + this.fileCount = fileCount; + this.directoryCount = directoryCount; + this.quota = quota; + this.spaceConsumed = spaceConsumed; + this.spaceQuota = spaceQuota; + } + + /** @return the length */ + public long getLength() {return length;} + + /** @return the directory count */ + public long getDirectoryCount() {return directoryCount;} + + /** @return the file count */ + public long getFileCount() {return fileCount;} + + /** Return the directory quota */ + public long getQuota() {return quota;} + + /** Retuns (disk) space consumed */ + public long getSpaceConsumed() {return spaceConsumed;} + + /** Returns (disk) space quota */ + public long getSpaceQuota() {return spaceQuota;} + + @InterfaceAudience.Private + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(length); + out.writeLong(fileCount); + out.writeLong(directoryCount); + out.writeLong(quota); + out.writeLong(spaceConsumed); + out.writeLong(spaceQuota); + } + + @InterfaceAudience.Private + @Override + public void readFields(DataInput in) throws IOException { + this.length = in.readLong(); + this.fileCount = in.readLong(); + this.directoryCount = in.readLong(); + this.quota = in.readLong(); + this.spaceConsumed = in.readLong(); + this.spaceQuota = in.readLong(); + } + + /** + * Output format: + * <----12----> <----12----> <-------18-------> + * DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME + */ + private static final String STRING_FORMAT = "%12d %12d %18d "; + /** + * Output format: + * <----12----> <----15----> <----15----> <----15----> <----12----> <----12----> <-------18-------> + * QUOTA REMAINING_QUATA SPACE_QUOTA SPACE_QUOTA_REM DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME + */ + private static final String QUOTA_STRING_FORMAT = "%12s %15s "; + private static final String SPACE_QUOTA_STRING_FORMAT = "%15s %15s "; + + /** The header string */ + private static final String HEADER = String.format( + STRING_FORMAT.replace('d', 's'), "directories", "files", "bytes"); + + private static final String QUOTA_HEADER = String.format( + QUOTA_STRING_FORMAT + SPACE_QUOTA_STRING_FORMAT, + "quota", "remaining quota", "space quota", "reamaining quota") + + HEADER; + + /** Return the header of the output. + * if qOption is false, output directory count, file count, and content size; + * if qOption is true, output quota and remaining quota as well. + * + * @param qOption a flag indicating if quota needs to be printed or not + * @return the header of the output + */ + public static String getHeader(boolean qOption) { + return qOption ? QUOTA_HEADER : HEADER; + } + + @Override + public String toString() { + return toString(true); + } + + /** Return the string representation of the object in the output format. + * if qOption is false, output directory count, file count, and content size; + * if qOption is true, output quota and remaining quota as well. + * + * @param qOption a flag indicating if quota needs to be printed or not + * @return the string representation of the object + */ + public String toString(boolean qOption) { + String prefix = ""; + if (qOption) { + String quotaStr = "none"; + String quotaRem = "inf"; + String spaceQuotaStr = "none"; + String spaceQuotaRem = "inf"; + + if (quota>0) { + quotaStr = Long.toString(quota); + quotaRem = Long.toString(quota-(directoryCount+fileCount)); + } + if (spaceQuota>0) { + spaceQuotaStr = Long.toString(spaceQuota); + spaceQuotaRem = Long.toString(spaceQuota - spaceConsumed); + } + + prefix = String.format(QUOTA_STRING_FORMAT + SPACE_QUOTA_STRING_FORMAT, + quotaStr, quotaRem, spaceQuotaStr, spaceQuotaRem); + } + + return prefix + String.format(STRING_FORMAT, directoryCount, + fileCount, length); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/CorruptFileBlocksWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/CorruptFileBlocksWritable.java new file mode 100644 index 0000000000..8f59046761 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/CorruptFileBlocksWritable.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.Text; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Contains a list of paths corresponding to corrupt files and a cookie + * used for iterative calls to NameNode.listCorruptFileBlocks. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class CorruptFileBlocksWritable implements Writable { + + private String[] files; + private String cookie; + + static public org.apache.hadoop.hdfs.protocol.CorruptFileBlocks + convertCorruptFileBlocks(CorruptFileBlocksWritable c) { + if (c == null) return null; + return new org.apache.hadoop.hdfs.protocol.CorruptFileBlocks( + c.getFiles(), c.getCookie()); + } + + public static CorruptFileBlocksWritable convertCorruptFilesBlocks( + org.apache.hadoop.hdfs.protocol.CorruptFileBlocks c) { + if (c == null) return null; + return new CorruptFileBlocksWritable(c.getFiles(), c.getCookie()); + } + + public CorruptFileBlocksWritable() { + this(new String[0], ""); + } + + public CorruptFileBlocksWritable(String[] files, String cookie) { + this.files = files; + this.cookie = cookie; + } + + public String[] getFiles() { + return files; + } + + public String getCookie() { + return cookie; + } + + @Override + public void readFields(DataInput in) throws IOException { + int fileCount = in.readInt(); + files = new String[fileCount]; + for (int i = 0; i < fileCount; i++) { + files[i] = Text.readString(in); + } + cookie = Text.readString(in); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(files.length); + for (int i = 0; i < files.length; i++) { + Text.writeString(out, files[i]); + } + Text.writeString(out, cookie); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DatanodeIDWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DatanodeIDWritable.java new file mode 100644 index 0000000000..67db5f3935 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DatanodeIDWritable.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DeprecatedUTF8; +import org.apache.hadoop.io.Writable; + +/** + * DatanodeID is composed of the data node + * name (hostname:portNumber) and the data storage ID, + * which it currently represents. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class DatanodeIDWritable implements Writable { + public static final DatanodeIDWritable[] EMPTY_ARRAY = {}; + + public String name; /// hostname:portNumber + public String storageID; /// unique per cluster storageID + protected int infoPort; /// the port where the infoserver is running + public int ipcPort; /// the port where the ipc server is running + + + static public DatanodeIDWritable[] + convertDatanodeID(org.apache.hadoop.hdfs.protocol.DatanodeID[] did) { + if (did == null) return null; + final int len = did.length; + DatanodeIDWritable[] result = new DatanodeIDWritable[len]; + for (int i = 0; i < len; ++i) { + result[i] = convertDatanodeID(did[i]); + } + return result; + } + + static public org.apache.hadoop.hdfs.protocol.DatanodeID[] + convertDatanodeID(DatanodeIDWritable[] did) { + if (did == null) return null; + final int len = did.length; + org.apache.hadoop.hdfs.protocol.DatanodeID[] result = new org.apache.hadoop.hdfs.protocol.DatanodeID[len]; + for (int i = 0; i < len; ++i) { + result[i] = convertDatanodeID(did[i]); + } + return result; + } + + static public org.apache.hadoop.hdfs.protocol.DatanodeID convertDatanodeID( + DatanodeIDWritable did) { + if (did == null) return null; + return new org.apache.hadoop.hdfs.protocol.DatanodeID( + did.getName(), did.getStorageID(), did.getInfoPort(), did.getIpcPort()); + + } + + public static DatanodeIDWritable convertDatanodeID(org.apache.hadoop.hdfs.protocol.DatanodeID from) { + return new DatanodeIDWritable(from.getName(), + from.getStorageID(), + from.getInfoPort(), + from.getIpcPort()); + } + + /** Equivalent to DatanodeID(""). */ + public DatanodeIDWritable() {this("");} + + /** Equivalent to DatanodeID(nodeName, "", -1, -1). */ + public DatanodeIDWritable(String nodeName) {this(nodeName, "", -1, -1);} + + /** + * DatanodeID copy constructor + * + * @param from + */ + public DatanodeIDWritable(DatanodeIDWritable from) { + this(from.getName(), + from.getStorageID(), + from.getInfoPort(), + from.getIpcPort()); + } + + /** + * Create DatanodeID + * @param nodeName (hostname:portNumber) + * @param storageID data storage ID + * @param infoPort info server port + * @param ipcPort ipc server port + */ + public DatanodeIDWritable(String nodeName, String storageID, + int infoPort, int ipcPort) { + this.name = nodeName; + this.storageID = storageID; + this.infoPort = infoPort; + this.ipcPort = ipcPort; + } + + public void setName(String name) { + this.name = name; + } + + public void setInfoPort(int infoPort) { + this.infoPort = infoPort; + } + + public void setIpcPort(int ipcPort) { + this.ipcPort = ipcPort; + } + + /** + * @return hostname:portNumber. + */ + public String getName() { + return name; + } + + /** + * @return data storage ID. + */ + public String getStorageID() { + return this.storageID; + } + + /** + * @return infoPort (the port at which the HTTP server bound to) + */ + public int getInfoPort() { + return infoPort; + } + + /** + * @return ipcPort (the port at which the IPC server bound to) + */ + public int getIpcPort() { + return ipcPort; + } + + /** + * sets the data storage ID. + */ + public void setStorageID(String storageID) { + this.storageID = storageID; + } + + /** + * @return hostname and no :portNumber. + */ + public String getHost() { + int colon = name.indexOf(":"); + if (colon < 0) { + return name; + } else { + return name.substring(0, colon); + } + } + + public int getPort() { + int colon = name.indexOf(":"); + if (colon < 0) { + return 50010; // default port. + } + return Integer.parseInt(name.substring(colon+1)); + } + + + public String toString() { + return name; + } + + ///////////////////////////////////////////////// + // Writable + ///////////////////////////////////////////////// + @Override + public void write(DataOutput out) throws IOException { + DeprecatedUTF8.writeString(out, name); + DeprecatedUTF8.writeString(out, storageID); + out.writeShort(infoPort); + } + + @Override + public void readFields(DataInput in) throws IOException { + name = DeprecatedUTF8.readString(in); + storageID = DeprecatedUTF8.readString(in); + // the infoPort read could be negative, if the port is a large number (more + // than 15 bits in storage size (but less than 16 bits). + // So chop off the first two bytes (and hence the signed bits) before + // setting the field. + this.infoPort = in.readShort() & 0x0000ffff; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DatanodeInfoWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DatanodeInfoWritable.java new file mode 100644 index 0000000000..4b6918964c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DatanodeInfoWritable.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.HadoopIllegalArgumentException; + +import org.apache.avro.reflect.Nullable; + +/** + * DatanodeInfo represents the status of a DataNode. + * This object is used for communication in the + * Datanode Protocol and the Client Protocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class DatanodeInfoWritable extends DatanodeIDWritable { + protected long capacity; + protected long dfsUsed; + protected long remaining; + protected long blockPoolUsed; + protected long lastUpdate; + protected int xceiverCount; + protected String location = NetworkTopology.DEFAULT_RACK; + + /** HostName as supplied by the datanode during registration as its + * name. Namenode uses datanode IP address as the name. + */ + @Nullable + protected String hostName = null; + + // administrative states of a datanode + public enum AdminStates { + NORMAL(org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates.NORMAL.toString()), + DECOMMISSION_INPROGRESS(org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates.DECOMMISSION_INPROGRESS.toString()), + DECOMMISSIONED(org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates.DECOMMISSIONED.toString()); + + final String value; + + AdminStates(final String v) { + this.value = v; + } + + public String toString() { + return value; + } + + public static AdminStates fromValue(final String value) { + for (AdminStates as : AdminStates.values()) { + if (as.value.equals(value)) return as; + } + throw new HadoopIllegalArgumentException("Unknown Admin State" + value); + } + } + + @Nullable + protected AdminStates adminState; + + static public org.apache.hadoop.hdfs.protocol.DatanodeInfo convertDatanodeInfo(DatanodeInfoWritable di) { + if (di == null) return null; + return new org.apache.hadoop.hdfs.protocol.DatanodeInfo( + new org.apache.hadoop.hdfs.protocol.DatanodeID(di.getName(), di.getStorageID(), di.getInfoPort(), di.getIpcPort()), + di.getNetworkLocation(), di.getHostName(), + di.getCapacity(), di.getDfsUsed(), di.getRemaining(), + di.getBlockPoolUsed() , di.getLastUpdate() , di.getXceiverCount() , + org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates.fromValue(di.getAdminState().value)); + } + + + static public org.apache.hadoop.hdfs.protocol.DatanodeInfo[] convertDatanodeInfo(DatanodeInfoWritable di[]) { + if (di == null) return null; + org.apache.hadoop.hdfs.protocol.DatanodeInfo[] result = new org.apache.hadoop.hdfs.protocol.DatanodeInfo[di.length]; + for (int i = 0; i < di.length; i++) { + result[i] = convertDatanodeInfo(di[i]); + } + return result; + } + + static public DatanodeInfoWritable[] convertDatanodeInfo(org.apache.hadoop.hdfs.protocol.DatanodeInfo[] di) { + if (di == null) return null; + DatanodeInfoWritable[] result = new DatanodeInfoWritable[di.length]; + for (int i = 0; i < di.length; i++) { + result[i] = new DatanodeInfoWritable(new DatanodeIDWritable(di[i].getName(), di[i].getStorageID(), di[i].getInfoPort(), di[i].getIpcPort()), + di[i].getNetworkLocation(), di[i].getHostName(), + di[i].getCapacity(), di[i].getDfsUsed(), di[i].getRemaining(), + di[i].getBlockPoolUsed() , di[i].getLastUpdate() , di[i].getXceiverCount() , + AdminStates.fromValue(di[i].getAdminState().toString())); + } + return result; + + } + + public DatanodeInfoWritable() { + super(); + adminState = null; + } + + public DatanodeInfoWritable(DatanodeInfoWritable from) { + super(from); + this.capacity = from.getCapacity(); + this.dfsUsed = from.getDfsUsed(); + this.remaining = from.getRemaining(); + this.blockPoolUsed = from.getBlockPoolUsed(); + this.lastUpdate = from.getLastUpdate(); + this.xceiverCount = from.getXceiverCount(); + this.location = from.getNetworkLocation(); + this.adminState = from.adminState; + this.hostName = from.hostName; + } + + public DatanodeInfoWritable(DatanodeIDWritable nodeID) { + super(nodeID); + this.capacity = 0L; + this.dfsUsed = 0L; + this.remaining = 0L; + this.blockPoolUsed = 0L; + this.lastUpdate = 0L; + this.xceiverCount = 0; + this.adminState = null; + } + + protected DatanodeInfoWritable(DatanodeIDWritable nodeID, String location, String hostName) { + this(nodeID); + this.location = location; + this.hostName = hostName; + } + + public DatanodeInfoWritable(DatanodeIDWritable nodeID, String location, String hostName, + final long capacity, final long dfsUsed, final long remaining, + final long blockPoolUsed, final long lastUpdate, final int xceiverCount, + final AdminStates adminState) { + this(nodeID, location, hostName); + this.capacity = capacity; + this.dfsUsed = dfsUsed; + this.remaining = remaining; + this.blockPoolUsed = blockPoolUsed; + this.lastUpdate = lastUpdate; + this.xceiverCount = xceiverCount; + this.adminState = adminState; + } + + /** The raw capacity. */ + public long getCapacity() { return capacity; } + + /** The used space by the data node. */ + public long getDfsUsed() { return dfsUsed; } + + /** The used space by the block pool on data node. */ + public long getBlockPoolUsed() { return blockPoolUsed; } + + /** The used space by the data node. */ + public long getNonDfsUsed() { + long nonDFSUsed = capacity - dfsUsed - remaining; + return nonDFSUsed < 0 ? 0 : nonDFSUsed; + } + + /** The used space by the data node as percentage of present capacity */ + public float getDfsUsedPercent() { + return DFSUtil.getPercentUsed(dfsUsed, capacity); + } + + /** The raw free space. */ + public long getRemaining() { return remaining; } + + /** Used space by the block pool as percentage of present capacity */ + public float getBlockPoolUsedPercent() { + return DFSUtil.getPercentUsed(blockPoolUsed, capacity); + } + + /** The remaining space as percentage of configured capacity. */ + public float getRemainingPercent() { + return DFSUtil.getPercentRemaining(remaining, capacity); + } + + /** The time when this information was accurate. */ + public long getLastUpdate() { return lastUpdate; } + + /** number of active connections */ + public int getXceiverCount() { return xceiverCount; } + + /** Sets raw capacity. */ + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + /** Sets the used space for the datanode. */ + public void setDfsUsed(long dfsUsed) { + this.dfsUsed = dfsUsed; + } + + /** Sets raw free space. */ + public void setRemaining(long remaining) { + this.remaining = remaining; + } + + /** Sets block pool used space */ + public void setBlockPoolUsed(long bpUsed) { + this.blockPoolUsed = bpUsed; + } + + /** Sets time when this information was accurate. */ + public void setLastUpdate(long lastUpdate) { + this.lastUpdate = lastUpdate; + } + + /** Sets number of active connections */ + public void setXceiverCount(int xceiverCount) { + this.xceiverCount = xceiverCount; + } + + /** rack name */ + public String getNetworkLocation() {return location;} + + /** Sets the rack name */ + public void setNetworkLocation(String location) { + this.location = NodeBase.normalize(location); + } + + public String getHostName() { + return (hostName == null || hostName.length()==0) ? getHost() : hostName; + } + + public void setHostName(String host) { + hostName = host; + } + + /** + * Retrieves the admin state of this node. + */ + public AdminStates getAdminState() { + if (adminState == null) { + return AdminStates.NORMAL; + } + return adminState; + } + + /** + * Sets the admin state of this node. + */ + protected void setAdminState(AdminStates newState) { + if (newState == AdminStates.NORMAL) { + adminState = null; + } + else { + adminState = newState; + } + } + + ///////////////////////////////////////////////// + // Writable + ///////////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory + (DatanodeInfoWritable.class, + new WritableFactory() { + public Writable newInstance() { return new DatanodeInfoWritable(); } + }); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + + out.writeShort(ipcPort); + + out.writeLong(capacity); + out.writeLong(dfsUsed); + out.writeLong(remaining); + out.writeLong(blockPoolUsed); + out.writeLong(lastUpdate); + out.writeInt(xceiverCount); + Text.writeString(out, location); + Text.writeString(out, hostName == null? "" : hostName); + WritableUtils.writeEnum(out, getAdminState()); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + this.ipcPort = in.readShort() & 0x0000ffff; + + this.capacity = in.readLong(); + this.dfsUsed = in.readLong(); + this.remaining = in.readLong(); + this.blockPoolUsed = in.readLong(); + this.lastUpdate = in.readLong(); + this.xceiverCount = in.readInt(); + this.location = Text.readString(in); + this.hostName = Text.readString(in); + setAdminState(WritableUtils.readEnum(in, AdminStates.class)); + } + + /** Read a DatanodeInfo */ + public static DatanodeInfoWritable read(DataInput in) throws IOException { + final DatanodeInfoWritable d = new DatanodeInfoWritable(); + d.readFields(in); + return d; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DirectoryListingWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DirectoryListingWritable.java new file mode 100644 index 0000000000..aee4bc928c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/DirectoryListingWritable.java @@ -0,0 +1,157 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * This class defines a partial listing of a directory to support + * iterative directory listing. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class DirectoryListingWritable implements Writable { + static { // register a ctor + WritableFactories.setFactory + (DirectoryListingWritable.class, + new WritableFactory() { + public Writable newInstance() { return new DirectoryListingWritable(); } + }); + } + + private HdfsFileStatusWritable[] partialListing; + private int remainingEntries; + + public static org.apache.hadoop.hdfs.protocol.DirectoryListing + convertDirectoryListing(DirectoryListingWritable dl) { + if (dl == null) return null; + return new org.apache.hadoop.hdfs.protocol.DirectoryListing( + HdfsFileStatusWritable.convertHdfsFileStatus( + dl.getPartialListing()), dl.getRemainingEntries()); + } + + public static DirectoryListingWritable convertDirectoryListing( + org.apache.hadoop.hdfs.protocol.DirectoryListing d) { + if (d == null) return null; + return new DirectoryListingWritable( + org.apache.hadoop.hdfs.protocolR23Compatible.HdfsFileStatusWritable. + convertHdfsFileStatus(d.getPartialListing()), d.getRemainingEntries()); + } + + /** + * default constructor + */ + public DirectoryListingWritable() { + } + + /** + * constructor + * @param partialListing a partial listing of a directory + * @param remainingEntries number of entries that are left to be listed + */ + public DirectoryListingWritable(HdfsFileStatusWritable[] partialListing, + int remainingEntries) { + if (partialListing == null) { + throw new IllegalArgumentException("partial listing should not be null"); + } + if (partialListing.length == 0 && remainingEntries != 0) { + throw new IllegalArgumentException("Partial listing is empty but " + + "the number of remaining entries is not zero"); + } + this.partialListing = partialListing; + this.remainingEntries = remainingEntries; + } + + /** + * Get the partial listing of file status + * @return the partial listing of file status + */ + public HdfsFileStatusWritable[] getPartialListing() { + return partialListing; + } + + /** + * Get the number of remaining entries that are left to be listed + * @return the number of remaining entries that are left to be listed + */ + public int getRemainingEntries() { + return remainingEntries; + } + + /** + * Check if there are more entries that are left to be listed + * @return true if there are more entries that are left to be listed; + * return false otherwise. + */ + public boolean hasMore() { + return remainingEntries != 0; + } + + /** + * Get the last name in this list + * @return the last name in the list if it is not empty; otherwise return null + */ + public byte[] getLastName() { + if (partialListing.length == 0) { + return null; + } + return partialListing[partialListing.length-1].getLocalNameInBytes(); + } + + // Writable interface + @Override + public void readFields(DataInput in) throws IOException { + int numEntries = in.readInt(); + partialListing = new HdfsFileStatusWritable[numEntries]; + if (numEntries !=0 ) { + boolean hasLocation = in.readBoolean(); + for (int i=0; i( + tok.getIdentifier(), tok.getPassword(), tok.getKind(), + tok.getService())); + return result; + } + + public static LocatedBlockWritable + convertLocatedBlock(org.apache.hadoop.hdfs.protocol.LocatedBlock lb) { + if (lb == null) return null; + LocatedBlockWritable result = + new LocatedBlockWritable(ExtendedBlockWritable.convertExtendedBlock(lb.getBlock()), + DatanodeInfoWritable.convertDatanodeInfo(lb.getLocations()), + lb.getStartOffset(), lb.isCorrupt()); + + // Fill in the token + org.apache.hadoop.security.token.Token tok = + lb.getBlockToken(); + result.setBlockToken(new TokenWritable(tok.getIdentifier(), tok.getPassword(), + tok.getKind(), tok.getService())); + return result; + } + + static public LocatedBlockWritable[] + convertLocatedBlock(org.apache.hadoop.hdfs.protocol.LocatedBlock[] lb) { + if (lb == null) return null; + final int len = lb.length; + LocatedBlockWritable[] result = new LocatedBlockWritable[len]; + for (int i = 0; i < len; ++i) { + result[i] = new LocatedBlockWritable( + ExtendedBlockWritable.convertExtendedBlock(lb[i].getBlock()), + DatanodeInfoWritable.convertDatanodeInfo(lb[i].getLocations()), + lb[i].getStartOffset(), lb[i].isCorrupt()); + } + return result; + } + + static public org.apache.hadoop.hdfs.protocol.LocatedBlock[] + convertLocatedBlock(LocatedBlockWritable[] lb) { + if (lb == null) return null; + final int len = lb.length; + org.apache.hadoop.hdfs.protocol.LocatedBlock[] result = + new org.apache.hadoop.hdfs.protocol.LocatedBlock[len]; + for (int i = 0; i < len; ++i) { + result[i] = new org.apache.hadoop.hdfs.protocol.LocatedBlock( + ExtendedBlockWritable.convertExtendedBlock(lb[i].getBlock()), + DatanodeInfoWritable.convertDatanodeInfo(lb[i].getLocations()), + lb[i].getStartOffset(), lb[i].isCorrupt()); + } + return result; + } + + static public List + convertLocatedBlock( + List lb) { + if (lb == null) return null; + final int len = lb.size(); + List result = + new ArrayList(len); + for (int i = 0; i < len; ++i) { + result.add(LocatedBlockWritable.convertLocatedBlock(lb.get(i))); + } + return result; + } + + static public List + convertLocatedBlock2(List lb) { + if (lb == null) return null; + final int len = lb.size(); + List result = new ArrayList(len); + for (int i = 0; i < len; ++i) { + result.add(LocatedBlockWritable.convertLocatedBlock(lb.get(i))); + } + return result; + } + + public LocatedBlockWritable() { + this(new ExtendedBlockWritable(), new DatanodeInfoWritable[0], 0L, false); + } + + public LocatedBlockWritable(ExtendedBlockWritable eb) { + this(eb, new DatanodeInfoWritable[0], 0L, false); + } + + public LocatedBlockWritable(ExtendedBlockWritable b, DatanodeInfoWritable[] locs) { + this(b, locs, -1, false); // startOffset is unknown + } + + public LocatedBlockWritable(ExtendedBlockWritable b, DatanodeInfoWritable[] locs, long startOffset) { + this(b, locs, startOffset, false); + } + + public LocatedBlockWritable(ExtendedBlockWritable b, DatanodeInfoWritable[] locs, long startOffset, + boolean corrupt) { + this.b = b; + this.offset = startOffset; + this.corrupt = corrupt; + if (locs==null) { + this.locs = new DatanodeInfoWritable[0]; + } else { + this.locs = locs; + } + } + + public TokenWritable getBlockToken() { + return blockToken; + } + + public void setBlockToken(TokenWritable token) { + this.blockToken = token; + } + + public ExtendedBlockWritable getBlock() { + return b; + } + + public DatanodeInfoWritable[] getLocations() { + return locs; + } + + public long getStartOffset() { + return offset; + } + + public long getBlockSize() { + return b.getNumBytes(); + } + + void setStartOffset(long value) { + this.offset = value; + } + + void setCorrupt(boolean corrupt) { + this.corrupt = corrupt; + } + + public boolean isCorrupt() { + return this.corrupt; + } + + /////////////////////////////////////////// + // Writable + /////////////////////////////////////////// + @Override + public void write(DataOutput out) throws IOException { + blockToken.write(out); + out.writeBoolean(corrupt); + out.writeLong(offset); + b.write(out); + out.writeInt(locs.length); + for (int i = 0; i < locs.length; i++) { + locs[i].write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + blockToken.readFields(in); + this.corrupt = in.readBoolean(); + offset = in.readLong(); + this.b = new ExtendedBlockWritable(); + b.readFields(in); + int count = in.readInt(); + this.locs = new DatanodeInfoWritable[count]; + for (int i = 0; i < locs.length; i++) { + locs[i] = new DatanodeInfoWritable(); + locs[i].readFields(in); + } + } + + /** Read LocatedBlock from in. */ + public static LocatedBlockWritable read(DataInput in) throws IOException { + final LocatedBlockWritable lb = new LocatedBlockWritable(); + lb.readFields(in); + return lb; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + b + + "; getBlockSize()=" + getBlockSize() + + "; corrupt=" + corrupt + + "; offset=" + offset + + "; locs=" + java.util.Arrays.asList(locs) + + "}"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlocksWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlocksWritable.java new file mode 100644 index 0000000000..c38eb6ea4c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlocksWritable.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +import org.apache.avro.reflect.Nullable; + +/** + * Collection of blocks with their locations and the file length. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class LocatedBlocksWritable implements Writable { + private long fileLength; + private List blocks; // array of blocks with prioritized locations + private boolean underConstruction; + @Nullable + private LocatedBlockWritable lastLocatedBlock = null; + private boolean isLastBlockComplete = false; + + public static org.apache.hadoop.hdfs.protocol.LocatedBlocks convertLocatedBlocks( + LocatedBlocksWritable lb) { + if (lb == null) { + return null; + } + return new org.apache.hadoop.hdfs.protocol.LocatedBlocks( + lb.getFileLength(), lb.isUnderConstruction(), + LocatedBlockWritable.convertLocatedBlock(lb.getLocatedBlocks()), + LocatedBlockWritable.convertLocatedBlock(lb.getLastLocatedBlock()), + lb.isLastBlockComplete()); + } + + public static LocatedBlocksWritable convertLocatedBlocks( + org.apache.hadoop.hdfs.protocol.LocatedBlocks lb) { + if (lb == null) { + return null; + } + return new LocatedBlocksWritable(lb.getFileLength(), lb.isUnderConstruction(), + LocatedBlockWritable.convertLocatedBlock2(lb.getLocatedBlocks()), + LocatedBlockWritable.convertLocatedBlock(lb.getLastLocatedBlock()), + lb.isLastBlockComplete()); + } + + public LocatedBlocksWritable() { + this(0, false, null, null, false); + } + + /** public Constructor */ + public LocatedBlocksWritable(long flength, boolean isUnderConstuction, + List blks, + LocatedBlockWritable lastBlock, boolean isLastBlockCompleted) { + fileLength = flength; + blocks = blks; + underConstruction = isUnderConstuction; + this.lastLocatedBlock = lastBlock; + this.isLastBlockComplete = isLastBlockCompleted; + } + + /** + * Get located blocks. + */ + public List getLocatedBlocks() { + return blocks; + } + + /** Get the last located block. */ + public LocatedBlockWritable getLastLocatedBlock() { + return lastLocatedBlock; + } + + /** Is the last block completed? */ + public boolean isLastBlockComplete() { + return isLastBlockComplete; + } + + /** + * Get located block. + */ + public LocatedBlockWritable get(int index) { + return blocks.get(index); + } + + /** + * Get number of located blocks. + */ + public int locatedBlockCount() { + return blocks == null ? 0 : blocks.size(); + } + + /** + * Get file length + */ + public long getFileLength() { + return this.fileLength; + } + + /** + * Return ture if file was under construction when + * this LocatedBlocks was constructed, false otherwise. + */ + public boolean isUnderConstruction() { + return underConstruction; + } + + ////////////////////////////////////////////////// + // Writable + ////////////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory + (LocatedBlocksWritable.class, + new WritableFactory() { + public Writable newInstance() { return new LocatedBlocksWritable(); } + }); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(this.fileLength); + out.writeBoolean(underConstruction); + + //write the last located block + final boolean isNull = lastLocatedBlock == null; + out.writeBoolean(isNull); + if (!isNull) { + lastLocatedBlock.write(out); + } + out.writeBoolean(isLastBlockComplete); + + // write located blocks + int nrBlocks = locatedBlockCount(); + out.writeInt(nrBlocks); + if (nrBlocks == 0) { + return; + } + for (LocatedBlockWritable blk : this.blocks) { + blk.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.fileLength = in.readLong(); + underConstruction = in.readBoolean(); + + //read the last located block + final boolean isNull = in.readBoolean(); + if (!isNull) { + lastLocatedBlock = LocatedBlockWritable.read(in); + } + isLastBlockComplete = in.readBoolean(); + + // read located blocks + int nrBlocks = in.readInt(); + this.blocks = new ArrayList(nrBlocks); + for (int idx = 0; idx < nrBlocks; idx++) { + LocatedBlockWritable blk = new LocatedBlockWritable(); + blk.readFields(in); + this.blocks.add(blk); + } + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(getClass().getSimpleName()); + b.append("{") + .append("\n fileLength=").append(fileLength) + .append("\n underConstruction=").append(underConstruction) + .append("\n blocks=").append(blocks) + .append("\n lastLocatedBlock=").append(lastLocatedBlock) + .append("\n isLastBlockComplete=").append(isLastBlockComplete) + .append("}"); + return b.toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ProtocolSignatureWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ProtocolSignatureWritable.java new file mode 100644 index 0000000000..9dc929bf53 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ProtocolSignatureWritable.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProtocolSignatureWritable implements Writable { + static { // register a ctor + WritableFactories.setFactory + (ProtocolSignatureWritable.class, + new WritableFactory() { + public Writable newInstance() { return new ProtocolSignatureWritable(); } + }); + } + + private long version; + private int[] methods = null; // an array of method hash codes + + public static org.apache.hadoop.ipc.ProtocolSignature convert( + final ProtocolSignatureWritable ps) { + if (ps == null) return null; + return new org.apache.hadoop.ipc.ProtocolSignature( + ps.getVersion(), ps.getMethods()); + } + + public static ProtocolSignatureWritable convert( + final org.apache.hadoop.ipc.ProtocolSignature ps) { + if (ps == null) return null; + return new ProtocolSignatureWritable(ps.getVersion(), ps.getMethods()); + } + + /** + * default constructor + */ + public ProtocolSignatureWritable() { + } + + /** + * Constructor + * + * @param version server version + * @param methodHashcodes hash codes of the methods supported by server + */ + public ProtocolSignatureWritable(long version, int[] methodHashcodes) { + this.version = version; + this.methods = methodHashcodes; + } + + public long getVersion() { + return version; + } + + public int[] getMethods() { + return methods; + } + + @Override + public void readFields(DataInput in) throws IOException { + version = in.readLong(); + boolean hasMethods = in.readBoolean(); + if (hasMethods) { + int numMethods = in.readInt(); + methods = new int[numMethods]; + for (int i=0; i + + + + Namenode Client Protocols Compatible with the version + of Hadoop Release 23 + + +

+This package is for ALL versions of HDFS protocols that use writable data types +and are compatible with the version of the protocol that was + shipped with Release 23 of Hadoop. +

+ +Compatibility should be maintained: +
    +
  • Do NOT delete any methods
  • +
  • Do NOT change the signatures of any method: + do not change parameters, parameter types +or exceptions thrown by the method.
  • +
+

+You can add new methods and new types. If you need to change a method's +signature, please add a new method instead. +When you add new methods and new types do NOT change the version number. +

+Version number is changed ONLY when compatibility is broken (which +should be very rare and a big deal). +

\ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d4f5bc19f7..3e40ab43da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -123,6 +123,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolServerSideTranslatorR23; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -576,8 +577,13 @@ private void initIpcServer(Configuration conf) throws IOException { InetSocketAddress ipcAddr = NetUtils.createSocketAddr( conf.get("dfs.datanode.ipc.address")); - // Add all the RPC protocols that the Datanode implements - ipcServer = RPC.getServer(ClientDatanodeProtocol.class, this, ipcAddr.getHostName(), + // Add all the RPC protocols that the Datanode implements + ClientDatanodeProtocolServerSideTranslatorR23 + clientDatanodeProtocolServerTranslator = + new ClientDatanodeProtocolServerSideTranslatorR23(this); + ipcServer = RPC.getServer( + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class, + clientDatanodeProtocolServerTranslator, ipcAddr.getHostName(), ipcAddr.getPort(), conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index f786504c37..be0be10341 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; @@ -143,10 +145,13 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) serviceRPCAddress = null; } // Add all the RPC protocols that the namenode implements - this.server = RPC.getServer(ClientProtocol.class, this, - socAddr.getHostName(), socAddr.getPort(), - handlerCount, false, conf, - namesystem.getDelegationTokenSecretManager()); + ClientNamenodeProtocolServerSideTranslatorR23 clientProtocolServerTranslator = + new ClientNamenodeProtocolServerSideTranslatorR23(this); + this.server = RPC.getServer( + org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol.class, + clientProtocolServerTranslator, socAddr.getHostName(), + socAddr.getPort(), handlerCount, false, conf, + namesystem.getDelegationTokenSecretManager()); this.server.addProtocol(DatanodeProtocol.class, this); this.server.addProtocol(NamenodeProtocol.class, this); this.server.addProtocol(RefreshAuthorizationPolicyProtocol.class, this); @@ -210,7 +215,8 @@ public ProtocolSignature getProtocolSignature(String protocol, public long getProtocolVersion(String protocol, long clientVersion) throws IOException { if (protocol.equals(ClientProtocol.class.getName())) { - return ClientProtocol.versionID; + throw new IOException("Old Namenode Client protocol is not supported:" + + protocol + "Switch your clientside to " + ClientNamenodeWireProtocol.class); } else if (protocol.equals(DatanodeProtocol.class.getName())){ return DatanodeProtocol.versionID; } else if (protocol.equals(NamenodeProtocol.class.getName())){ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index 9ad87fe087..fd9c91d88c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -51,12 +51,12 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.io.TestWritable; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtocolSignature; @@ -96,9 +96,9 @@ public class TestBlockToken { ((Log4JLogger) SaslRpcServer.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger) SaslInputStream.LOG).getLogger().setLevel(Level.ALL); } - + /** Directory where we can count our open file descriptors under Linux */ - static File FD_DIR = new File("/proc/self/fd/"); + static File FD_DIR = new File("/proc/self/fd/"); long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins long blockTokenLifetime = 2 * 60 * 1000; // 2 mins @@ -120,7 +120,8 @@ public getLengthAnswer(BlockTokenSecretManager sm, public Long answer(InvocationOnMock invocation) throws IOException { Object args[] = invocation.getArguments(); assertEquals(1, args.length); - ExtendedBlock block = (ExtendedBlock) args[0]; + org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable block = + (org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable) args[0]; Set tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); assertEquals("Only one BlockTokenIdentifier expected", 1, tokenIds.size()); @@ -129,7 +130,9 @@ public Long answer(InvocationOnMock invocation) throws IOException { BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; LOG.info("Got: " + id.toString()); assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id)); - sm.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.WRITE); + sm.checkAccess(id, null, org.apache.hadoop.hdfs.protocolR23Compatible. + ExtendedBlockWritable.convertExtendedBlock(block), + BlockTokenSecretManager.AccessMode.WRITE); result = id.getBlockId(); } return result; @@ -137,7 +140,8 @@ public Long answer(InvocationOnMock invocation) throws IOException { } private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm, - ExtendedBlock block, EnumSet accessModes) + ExtendedBlock block, + EnumSet accessModes) throws IOException { Token token = sm.generateToken(block, accessModes); BlockTokenIdentifier id = sm.createIdentifier(); @@ -151,12 +155,12 @@ public void testWritable() throws Exception { TestWritable.testWritable(new BlockTokenIdentifier()); BlockTokenSecretManager sm = new BlockTokenSecretManager(true, blockKeyUpdateInterval, blockTokenLifetime); - TestWritable.testWritable(generateTokenId(sm, block1, EnumSet - .allOf(BlockTokenSecretManager.AccessMode.class))); - TestWritable.testWritable(generateTokenId(sm, block2, EnumSet - .of(BlockTokenSecretManager.AccessMode.WRITE))); - TestWritable.testWritable(generateTokenId(sm, block3, EnumSet - .noneOf(BlockTokenSecretManager.AccessMode.class))); + TestWritable.testWritable(generateTokenId(sm, block1, + EnumSet.allOf(BlockTokenSecretManager.AccessMode.class))); + TestWritable.testWritable(generateTokenId(sm, block2, + EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE))); + TestWritable.testWritable(generateTokenId(sm, block3, + EnumSet.noneOf(BlockTokenSecretManager.AccessMode.class))); } private void tokenGenerationAndVerification(BlockTokenSecretManager master, @@ -176,8 +180,8 @@ private void tokenGenerationAndVerification(BlockTokenSecretManager master, slave.checkAccess(token2, null, block2, mode); } // multi-mode tokens - Token mtoken = master.generateToken(block3, EnumSet - .allOf(BlockTokenSecretManager.AccessMode.class)); + Token mtoken = master.generateToken(block3, + EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)); for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode .values()) { master.checkAccess(mtoken, null, block3, mode); @@ -202,25 +206,28 @@ public void testBlockTokenSecretManager() throws Exception { slaveHandler.setKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler); } - + private Server createMockDatanode(BlockTokenSecretManager sm, Token token) throws IOException { - ClientDatanodeProtocol mockDN = mock(ClientDatanodeProtocol.class); + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol mockDN = + mock(org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class); when(mockDN.getProtocolVersion(anyString(), anyLong())).thenReturn( - ClientDatanodeProtocol.versionID); - doReturn(ProtocolSignature.getProtocolSignature( - mockDN, ClientDatanodeProtocol.class.getName(), - ClientDatanodeProtocol.versionID, 0)) - .when(mockDN).getProtocolSignature(anyString(), anyLong(), anyInt()); + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.versionID); + doReturn( + ProtocolSignature.getProtocolSignature(mockDN, + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class.getName(), + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.versionID, 0)).when(mockDN) + .getProtocolSignature(anyString(), anyLong(), anyInt()); BlockTokenIdentifier id = sm.createIdentifier(); id.readFields(new DataInputStream(new ByteArrayInputStream(token .getIdentifier()))); doAnswer(new getLengthAnswer(sm, id)).when(mockDN).getReplicaVisibleLength( - any(ExtendedBlock.class)); + any(org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable.class)); - return RPC.getServer(ClientDatanodeProtocol.class, mockDN, - ADDRESS, 0, 5, true, conf, sm); + return RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class, + mockDN, ADDRESS, 0, 5, + true, conf, sm); } @Test @@ -241,9 +248,8 @@ public void testBlockTokenRpc() throws Exception { ClientDatanodeProtocol proxy = null; try { - proxy = RPC.getProxy( - ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, addr, - ticket, conf, NetUtils.getDefaultSocketFactory(conf)); + proxy = DFSUtil.createClientDatanodeProtocolProxy(addr, ticket, conf, + NetUtils.getDefaultSocketFactory(conf)); assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3)); } finally { server.stop(); @@ -255,8 +261,8 @@ public void testBlockTokenRpc() throws Exception { /** * Test that fast repeated invocations of createClientDatanodeProtocolProxy - * will not end up using up thousands of sockets. This is a regression test for - * HDFS-1965. + * will not end up using up thousands of sockets. This is a regression test + * for HDFS-1965. */ @Test public void testBlockTokenRpcLeak() throws Exception { @@ -270,9 +276,9 @@ public void testBlockTokenRpcLeak() throws Exception { server.start(); final InetSocketAddress addr = NetUtils.getConnectAddress(server); - DatanodeID fakeDnId = new DatanodeID( - "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort()); - + DatanodeID fakeDnId = new DatanodeID("localhost:" + addr.getPort(), + "fake-storage", 0, addr.getPort()); + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]); fakeBlock.setBlockToken(token); @@ -282,19 +288,19 @@ public void testBlockTokenRpcLeak() throws Exception { // RPC "Client" object to stay above 0 such that RPC.stopProxy doesn't // actually close the TCP connections to the real target DN. ClientDatanodeProtocol proxyToNoWhere = RPC.getProxy( - ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, + ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, new InetSocketAddress("1.1.1.1", 1), - UserGroupInformation.createRemoteUser("junk"), - conf, NetUtils.getDefaultSocketFactory(conf)); - + UserGroupInformation.createRemoteUser("junk"), conf, + NetUtils.getDefaultSocketFactory(conf)); + ClientDatanodeProtocol proxy = null; int fdsAtStart = countOpenFileDescriptors(); try { long endTime = System.currentTimeMillis() + 3000; while (System.currentTimeMillis() < endTime) { - proxy = DFSUtil.createClientDatanodeProtocolProxy( - fakeDnId, conf, 1000, fakeBlock); + proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000, + fakeBlock); assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3)); if (proxy != null) { RPC.stopProxy(proxy); @@ -303,32 +309,31 @@ public void testBlockTokenRpcLeak() throws Exception { } int fdsAtEnd = countOpenFileDescriptors(); - + if (fdsAtEnd - fdsAtStart > 50) { fail("Leaked " + (fdsAtEnd - fdsAtStart) + " fds!"); } } finally { server.stop(); } - + RPC.stopProxy(proxyToNoWhere); } /** - * @return the current number of file descriptors open by this - * process. + * @return the current number of file descriptors open by this process. */ private static int countOpenFileDescriptors() throws IOException { return FD_DIR.list().length; } - /** + /** * Test {@link BlockPoolTokenSecretManager} */ @Test public void testBlockPoolTokenSecretManager() throws Exception { BlockPoolTokenSecretManager bpMgr = new BlockPoolTokenSecretManager(); - + // Test BlockPoolSecretManager with upto 10 block pools for (int i = 0; i < 10; i++) { String bpid = Integer.toString(i); @@ -337,12 +342,11 @@ public void testBlockPoolTokenSecretManager() throws Exception { BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false, blockKeyUpdateInterval, blockTokenLifetime); bpMgr.addBlockPool(bpid, slaveHandler); - - + ExportedBlockKeys keys = masterHandler.exportKeys(); bpMgr.setKeys(bpid, keys); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); - + // Test key updating masterHandler.updateKeys(); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); @@ -351,11 +355,12 @@ public void testBlockPoolTokenSecretManager() throws Exception { tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); } } - + /** - * This test writes a file and gets the block locations without closing - * the file, and tests the block token in the last block. Block token is - * verified by ensuring it is of correct kind. + * This test writes a file and gets the block locations without closing the + * file, and tests the block token in the last block. Block token is verified + * by ensuring it is of correct kind. + * * @throws IOException * @throws InterruptedException */ @@ -389,5 +394,5 @@ public void testBlockTokenInLastLocatedBlock() throws IOException, } finally { cluster.shutdown(); } - } + } }