diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 516ae556f9..75ce728acf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -11,6 +11,7 @@ Trunk (unreleased changes) HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants -> HdfsConstants. (Harsh J Chouraria via atm) + HDFS-2197. Refactor RPC call implementations out of NameNode class (todd) BUG FIXES HDFS-2287. TestParallelRead has a small off-by-one bug. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index 36f10d16fd..c3e948d272 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -52,7 +52,7 @@ * */ @InterfaceAudience.Private -public class BackupNode extends NameNode implements JournalProtocol { +public class BackupNode extends NameNode { private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY; private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT; private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY; @@ -74,7 +74,6 @@ public class BackupNode extends NameNode implements JournalProtocol { BackupNode(Configuration conf, NamenodeRole role) throws IOException { super(conf, role); - this.server.addProtocol(JournalProtocol.class, this); } ///////////////////////////////////////////////////// @@ -96,18 +95,20 @@ protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) throw } @Override // NameNode - protected void setRpcServerAddress(Configuration conf) { - conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(rpcAddress)); + protected void setRpcServerAddress(Configuration conf, + InetSocketAddress addr) { + conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(addr)); } @Override // Namenode - protected void setRpcServiceServerAddress(Configuration conf) { - conf.set(BN_SERVICE_RPC_ADDRESS_KEY, getHostPortString(serviceRPCAddress)); + protected void setRpcServiceServerAddress(Configuration conf, + InetSocketAddress addr) { + conf.set(BN_SERVICE_RPC_ADDRESS_KEY, getHostPortString(addr)); } @Override // NameNode protected InetSocketAddress getHttpServerAddress(Configuration conf) { - assert rpcAddress != null : "rpcAddress should be calculated first"; + assert getNameNodeAddress() != null : "rpcAddress should be calculated first"; String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT); return NetUtils.createSocketAddr(addr); } @@ -146,6 +147,12 @@ protected void initialize(Configuration conf) throws IOException { runCheckpointDaemon(conf); } + @Override + protected NameNodeRpcServer createRpcServer(Configuration conf) + throws IOException { + return new BackupNodeRpcServer(conf, this); + } + @Override // NameNode public void stop() { if(checkpointManager != null) { @@ -178,74 +185,84 @@ public void stop() { super.stop(); } + static class BackupNodeRpcServer extends NameNodeRpcServer implements JournalProtocol { + private final String nnRpcAddress; + + private BackupNodeRpcServer(Configuration conf, BackupNode nn) + throws IOException { + super(conf, nn); + this.server.addProtocol(JournalProtocol.class, this); + nnRpcAddress = nn.nnRpcAddress; + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (protocol.equals(JournalProtocol.class.getName())) { + return JournalProtocol.versionID; + } else { + return super.getProtocolVersion(protocol, clientVersion); + } + } - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - if (protocol.equals(JournalProtocol.class.getName())) { - return JournalProtocol.versionID; - } else { - return super.getProtocolVersion(protocol, clientVersion); + ///////////////////////////////////////////////////// + // NamenodeProtocol implementation for backup node. + ///////////////////////////////////////////////////// + @Override // NamenodeProtocol + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + throw new UnsupportedActionException("getBlocks"); + } + + // Only active name-node can register other nodes. + @Override // NamenodeProtocol + public NamenodeRegistration register(NamenodeRegistration registration + ) throws IOException { + throw new UnsupportedActionException("register"); + } + + @Override // NamenodeProtocol + public NamenodeCommand startCheckpoint(NamenodeRegistration registration) + throws IOException { + throw new UnsupportedActionException("startCheckpoint"); + } + + @Override // NamenodeProtocol + public void endCheckpoint(NamenodeRegistration registration, + CheckpointSignature sig) throws IOException { + throw new UnsupportedActionException("endCheckpoint"); + } + + ///////////////////////////////////////////////////// + // BackupNodeProtocol implementation for backup node. + ///////////////////////////////////////////////////// + + @Override + public void journal(NamenodeRegistration nnReg, + long firstTxId, int numTxns, + byte[] records) throws IOException { + verifyRequest(nnReg); + if(!nnRpcAddress.equals(nnReg.getAddress())) + throw new IOException("Journal request from unexpected name-node: " + + nnReg.getAddress() + " expecting " + rpcAddress); + getBNImage().journal(firstTxId, numTxns, records); + } + + @Override + public void startLogSegment(NamenodeRegistration registration, long txid) + throws IOException { + verifyRequest(registration); + + getBNImage().namenodeStartedLogSegment(txid); + } + + private BackupImage getBNImage() { + return (BackupImage)nn.getFSImage(); } } - - ///////////////////////////////////////////////////// - // NamenodeProtocol implementation for backup node. - ///////////////////////////////////////////////////// - @Override // NamenodeProtocol - public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) - throws IOException { - throw new UnsupportedActionException("getBlocks"); - } - - // Only active name-node can register other nodes. - @Override // NamenodeProtocol - public NamenodeRegistration register(NamenodeRegistration registration - ) throws IOException { - throw new UnsupportedActionException("register"); - } - - @Override // NamenodeProtocol - public NamenodeCommand startCheckpoint(NamenodeRegistration registration) - throws IOException { - throw new UnsupportedActionException("startCheckpoint"); - } - - @Override // NamenodeProtocol - public void endCheckpoint(NamenodeRegistration registration, - CheckpointSignature sig) throws IOException { - throw new UnsupportedActionException("endCheckpoint"); - } - - ///////////////////////////////////////////////////// - // BackupNodeProtocol implementation for backup node. - ///////////////////////////////////////////////////// - - @Override - public void journal(NamenodeRegistration nnReg, - long firstTxId, int numTxns, - byte[] records) throws IOException { - verifyRequest(nnReg); - if(!nnRpcAddress.equals(nnReg.getAddress())) - throw new IOException("Journal request from unexpected name-node: " - + nnReg.getAddress() + " expecting " + nnRpcAddress); - getBNImage().journal(firstTxId, numTxns, records); - } - - @Override - public void startLogSegment(NamenodeRegistration registration, long txid) - throws IOException { - verifyRequest(registration); - getBNImage().namenodeStartedLogSegment(txid); - } - ////////////////////////////////////////////////////// - - BackupImage getBNImage() { - return (BackupImage)getFSImage(); - } boolean shouldCheckpointAtStartup() { FSImage fsImage = getFSImage(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java index e4de6345b6..2a41aeeb9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java @@ -69,7 +69,7 @@ protected void doGet(final HttpServletRequest req, final HttpServletResponse res try { ugi.doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { - nn.cancelDelegationToken(token); + nn.getRpcServer().cancelDelegationToken(token); return null; } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java index ea0f392a3d..1c8253f665 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java @@ -73,7 +73,7 @@ protected ClientProtocol createNameNodeProxy() throws IOException { // rpc NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context); if (nn != null) { - return nn; + return nn.getRpcServer(); } InetSocketAddress nnAddr = NameNodeHttpServer.getNameNodeAddressFromContext(context); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java index 2c0f81abc5..4fc9dcca63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java @@ -75,7 +75,7 @@ public Void run() throws Exception { + ":" + NameNode.getAddress(conf).getPort(); Token token = - nn.getDelegationToken(new Text(renewerFinal)); + nn.getRpcServer().getDelegationToken(new Text(renewerFinal)); if(token == null) { throw new Exception("couldn't get the token for " +s); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 95bad27440..2638b08087 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; + import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -32,79 +33,21 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.FsServerDefaults; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.PermissionStatus; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; -import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; -import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; -import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; -import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; -import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; -import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; -import org.apache.hadoop.io.EnumSetWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.Node; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Groups; -import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; @@ -146,7 +89,7 @@ * NameNode state, for example partial blocksMap etc. **********************************************************/ @InterfaceAudience.Private -public class NameNode implements NamenodeProtocols { +public class NameNode { static{ HdfsConfiguration.init(); } @@ -179,32 +122,6 @@ public class NameNode implements NamenodeProtocols { DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY }; - public long getProtocolVersion(String protocol, - long clientVersion) throws IOException { - if (protocol.equals(ClientProtocol.class.getName())) { - return ClientProtocol.versionID; - } else if (protocol.equals(DatanodeProtocol.class.getName())){ - return DatanodeProtocol.versionID; - } else if (protocol.equals(NamenodeProtocol.class.getName())){ - return NamenodeProtocol.versionID; - } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){ - return RefreshAuthorizationPolicyProtocol.versionID; - } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){ - return RefreshUserMappingsProtocol.versionID; - } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){ - return GetUserMappingsProtocol.versionID; - } else { - throw new IOException("Unknown protocol to name node: " + protocol); - } - } - - - @Override // VersionedProtocol - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - return ProtocolSignature.getProtocolSignature( - this, protocol, clientVersion, clientMethodsHash); - } public static final int DEFAULT_PORT = 8020; @@ -213,18 +130,6 @@ public ProtocolSignature getProtocolSignature(String protocol, protected FSNamesystem namesystem; protected NamenodeRole role; - /** RPC server. Package-protected for use in tests. */ - RPC.Server server; - /** RPC server for HDFS Services communication. - BackupNode, Datanodes and all other services - should be connecting to this server if it is - configured. Clients should only go to NameNode#server - */ - protected Server serviceRpcServer; - /** RPC server address */ - protected InetSocketAddress rpcAddress = null; - /** RPC server for DN address */ - protected InetSocketAddress serviceRPCAddress = null; /** httpServer */ protected NameNodeHttpServer httpServer; private Thread emptier; @@ -232,11 +137,11 @@ public ProtocolSignature getProtocolSignature(String protocol, protected boolean stopRequested = false; /** Registration information of this name-node */ protected NamenodeRegistration nodeRegistration; - /** Is service level authorization enabled? */ - private boolean serviceAuthEnabled = false; /** Activated plug-ins. */ private List plugins; + private NameNodeRpcServer rpcServer; + /** Format a new filesystem. Destroys any filesystem that may already * exist at this location. **/ public static void format(Configuration conf) throws IOException { @@ -252,6 +157,10 @@ public FSNamesystem getNamesystem() { return namesystem; } + public NamenodeProtocols getRpcServer() { + return rpcServer; + } + static void initMetrics(Configuration conf, NamenodeRole role) { metrics = NameNodeMetrics.create(conf, role); } @@ -359,11 +268,13 @@ protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOExc /** * Modifies the configuration passed to contain the service rpc address setting */ - protected void setRpcServiceServerAddress(Configuration conf) { + protected void setRpcServiceServerAddress(Configuration conf, + InetSocketAddress serviceRPCAddress) { setServiceAddress(conf, getHostPortString(serviceRPCAddress)); } - protected void setRpcServerAddress(Configuration conf) { + protected void setRpcServerAddress(Configuration conf, + InetSocketAddress rpcAddress) { FileSystem.setDefaultUri(conf, getUri(rpcAddress)); } @@ -387,7 +298,7 @@ NamenodeRegistration getRegistration() { NamenodeRegistration setRegistration() { nodeRegistration = new NamenodeRegistration( - getHostPortString(rpcAddress), + getHostPortString(rpcServer.getRpcAddress()), getHostPortString(getHttpAddress()), getFSImage().getStorage(), getRole()); return nodeRegistration; @@ -408,45 +319,13 @@ void loginAsNameNodeUser(Configuration conf) throws IOException { * @param conf the configuration */ protected void initialize(Configuration conf) throws IOException { - InetSocketAddress socAddr = getRpcServerAddress(conf); UserGroupInformation.setConfiguration(conf); loginAsNameNodeUser(conf); - int handlerCount = - conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, - DFS_DATANODE_HANDLER_COUNT_DEFAULT); NameNode.initMetrics(conf, this.getRole()); loadNamesystem(conf); - // create rpc server - InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf); - if (dnSocketAddr != null) { - int serviceHandlerCount = - conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, - DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); - this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this, - dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, - false, conf, namesystem.getDelegationTokenSecretManager()); - this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); - setRpcServiceServerAddress(conf); - } - this.server = RPC.getServer(NamenodeProtocols.class, this, - socAddr.getHostName(), socAddr.getPort(), - handlerCount, false, conf, - namesystem.getDelegationTokenSecretManager()); - // set service-level authorization security policy - if (serviceAuthEnabled = - conf.getBoolean( - CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { - this.server.refreshServiceAcl(conf, new HDFSPolicyProvider()); - if (this.serviceRpcServer != null) { - this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); - } - } - - // The rpc-server port can be ephemeral... ensure we have the correct info - this.rpcAddress = this.server.getListenerAddress(); - setRpcServerAddress(conf); + rpcServer = createRpcServer(conf); try { validateConfigurationSettings(conf); @@ -456,12 +335,21 @@ protected void initialize(Configuration conf) throws IOException { } activate(conf); - LOG.info(getRole() + " up at: " + rpcAddress); - if (serviceRPCAddress != null) { - LOG.info(getRole() + " service server is up at: " + serviceRPCAddress); + LOG.info(getRole() + " up at: " + rpcServer.getRpcAddress()); + if (rpcServer.getServiceRpcAddress() != null) { + LOG.info(getRole() + " service server is up at: " + rpcServer.getServiceRpcAddress()); } } + /** + * Create the RPC server implementation. Used as an extension point for the + * BackupNode. + */ + protected NameNodeRpcServer createRpcServer(Configuration conf) + throws IOException { + return new NameNodeRpcServer(conf, this); + } + /** * Verifies that the final Configuration Settings look ok for the NameNode to * properly start up @@ -494,10 +382,7 @@ void activate(Configuration conf) throws IOException { } namesystem.activate(conf); startHttpServer(conf); - server.start(); //start RPC server - if (serviceRpcServer != null) { - serviceRpcServer.start(); - } + rpcServer.start(); startTrashEmptier(conf); plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, @@ -577,7 +462,7 @@ protected NameNode(Configuration conf, NamenodeRole role) */ public void join() { try { - this.server.join(); + this.rpcServer.join(); } catch (InterruptedException ie) { } } @@ -607,8 +492,7 @@ public void stop() { } if(namesystem != null) namesystem.close(); if(emptier != null) emptier.interrupt(); - if(server != null) server.stop(); - if(serviceRpcServer != null) serviceRpcServer.stop(); + if(rpcServer != null) rpcServer.stop(); if (metrics != null) { metrics.shutdown(); } @@ -621,410 +505,6 @@ synchronized boolean isStopRequested() { return stopRequested; } - ///////////////////////////////////////////////////// - // NamenodeProtocol - ///////////////////////////////////////////////////// - @Override // NamenodeProtocol - public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) - throws IOException { - if(size <= 0) { - throw new IllegalArgumentException( - "Unexpected not positive size: "+size); - } - - return namesystem.getBlockManager().getBlocks(datanode, size); - } - - @Override // NamenodeProtocol - public ExportedBlockKeys getBlockKeys() throws IOException { - return namesystem.getBlockManager().getBlockKeys(); - } - - @Override // NamenodeProtocol - public void errorReport(NamenodeRegistration registration, - int errorCode, - String msg) throws IOException { - verifyRequest(registration); - LOG.info("Error report from " + registration + ": " + msg); - if(errorCode == FATAL) - namesystem.releaseBackupNode(registration); - } - - @Override // NamenodeProtocol - public NamenodeRegistration register(NamenodeRegistration registration) - throws IOException { - verifyVersion(registration.getVersion()); - NamenodeRegistration myRegistration = setRegistration(); - namesystem.registerBackupNode(registration, myRegistration); - return myRegistration; - } - - @Override // NamenodeProtocol - public NamenodeCommand startCheckpoint(NamenodeRegistration registration) - throws IOException { - verifyRequest(registration); - if(!isRole(NamenodeRole.NAMENODE)) - throw new IOException("Only an ACTIVE node can invoke startCheckpoint."); - return namesystem.startCheckpoint(registration, setRegistration()); - } - - @Override // NamenodeProtocol - public void endCheckpoint(NamenodeRegistration registration, - CheckpointSignature sig) throws IOException { - verifyRequest(registration); - if(!isRole(NamenodeRole.NAMENODE)) - throw new IOException("Only an ACTIVE node can invoke endCheckpoint."); - namesystem.endCheckpoint(registration, sig); - } - - @Override // ClientProtocol - public Token getDelegationToken(Text renewer) - throws IOException { - return namesystem.getDelegationToken(renewer); - } - - @Override // ClientProtocol - public long renewDelegationToken(Token token) - throws InvalidToken, IOException { - return namesystem.renewDelegationToken(token); - } - - @Override // ClientProtocol - public void cancelDelegationToken(Token token) - throws IOException { - namesystem.cancelDelegationToken(token); - } - - @Override // ClientProtocol - public LocatedBlocks getBlockLocations(String src, - long offset, - long length) - throws IOException { - metrics.incrGetBlockLocations(); - return namesystem.getBlockLocations(getClientMachine(), - src, offset, length); - } - - @Override // ClientProtocol - public FsServerDefaults getServerDefaults() throws IOException { - return namesystem.getServerDefaults(); - } - - @Override // ClientProtocol - public void create(String src, - FsPermission masked, - String clientName, - EnumSetWritable flag, - boolean createParent, - short replication, - long blockSize) throws IOException { - String clientMachine = getClientMachine(); - if (stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.create: file " - +src+" for "+clientName+" at "+clientMachine); - } - if (!checkPathLength(src)) { - throw new IOException("create: Pathname too long. Limit " - + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); - } - namesystem.startFile(src, - new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(), - null, masked), - clientName, clientMachine, flag.get(), createParent, replication, blockSize); - metrics.incrFilesCreated(); - metrics.incrCreateFileOps(); - } - - @Override // ClientProtocol - public LocatedBlock append(String src, String clientName) - throws IOException { - String clientMachine = getClientMachine(); - if (stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.append: file " - +src+" for "+clientName+" at "+clientMachine); - } - LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine); - metrics.incrFilesAppended(); - return info; - } - - @Override // ClientProtocol - public boolean recoverLease(String src, String clientName) throws IOException { - String clientMachine = getClientMachine(); - return namesystem.recoverLease(src, clientName, clientMachine); - } - - @Override // ClientProtocol - public boolean setReplication(String src, short replication) - throws IOException { - return namesystem.setReplication(src, replication); - } - - @Override // ClientProtocol - public void setPermission(String src, FsPermission permissions) - throws IOException { - namesystem.setPermission(src, permissions); - } - - @Override // ClientProtocol - public void setOwner(String src, String username, String groupname) - throws IOException { - namesystem.setOwner(src, username, groupname); - } - - @Override // ClientProtocol - public LocatedBlock addBlock(String src, - String clientName, - ExtendedBlock previous, - DatanodeInfo[] excludedNodes) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " - +src+" for "+clientName); - } - HashMap excludedNodesSet = null; - if (excludedNodes != null) { - excludedNodesSet = new HashMap(excludedNodes.length); - for (Node node:excludedNodes) { - excludedNodesSet.put(node, node); - } - } - LocatedBlock locatedBlock = - namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet); - if (locatedBlock != null) - metrics.incrAddBlockOps(); - return locatedBlock; - } - - @Override // ClientProtocol - public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk, - final DatanodeInfo[] existings, final DatanodeInfo[] excludes, - final int numAdditionalNodes, final String clientName - ) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("getAdditionalDatanode: src=" + src - + ", blk=" + blk - + ", existings=" + Arrays.asList(existings) - + ", excludes=" + Arrays.asList(excludes) - + ", numAdditionalNodes=" + numAdditionalNodes - + ", clientName=" + clientName); - } - - metrics.incrGetAdditionalDatanodeOps(); - - HashMap excludeSet = null; - if (excludes != null) { - excludeSet = new HashMap(excludes.length); - for (Node node : excludes) { - excludeSet.put(node, node); - } - } - return namesystem.getAdditionalDatanode(src, blk, - existings, excludeSet, numAdditionalNodes, clientName); - } - - /** - * The client needs to give up on the block. - */ - public void abandonBlock(ExtendedBlock b, String src, String holder) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: " - +b+" of file "+src); - } - if (!namesystem.abandonBlock(b, src, holder)) { - throw new IOException("Cannot abandon block during write to " + src); - } - } - - @Override // ClientProtocol - public boolean complete(String src, String clientName, ExtendedBlock last) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.complete: " - + src + " for " + clientName); - } - return namesystem.completeFile(src, clientName, last); - } - - /** - * The client has detected an error on the specified located blocks - * and is reporting them to the server. For now, the namenode will - * mark the block as corrupt. In the future we might - * check the blocks are actually corrupt. - */ - @Override - public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { - stateChangeLog.info("*DIR* NameNode.reportBadBlocks"); - for (int i = 0; i < blocks.length; i++) { - ExtendedBlock blk = blocks[i].getBlock(); - DatanodeInfo[] nodes = blocks[i].getLocations(); - for (int j = 0; j < nodes.length; j++) { - DatanodeInfo dn = nodes[j]; - namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn); - } - } - } - - @Override // ClientProtocol - public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName) - throws IOException { - return namesystem.updateBlockForPipeline(block, clientName); - } - - - @Override // ClientProtocol - public void updatePipeline(String clientName, ExtendedBlock oldBlock, - ExtendedBlock newBlock, DatanodeID[] newNodes) - throws IOException { - namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes); - } - - @Override // DatanodeProtocol - public void commitBlockSynchronization(ExtendedBlock block, - long newgenerationstamp, long newlength, - boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) - throws IOException { - namesystem.commitBlockSynchronization(block, - newgenerationstamp, newlength, closeFile, deleteblock, newtargets); - } - - @Override // ClientProtocol - public long getPreferredBlockSize(String filename) - throws IOException { - return namesystem.getPreferredBlockSize(filename); - } - - @Deprecated - @Override // ClientProtocol - public boolean rename(String src, String dst) throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); - } - if (!checkPathLength(dst)) { - throw new IOException("rename: Pathname too long. Limit " - + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); - } - boolean ret = namesystem.renameTo(src, dst); - if (ret) { - metrics.incrFilesRenamed(); - } - return ret; - } - - @Override // ClientProtocol - public void concat(String trg, String[] src) throws IOException { - namesystem.concat(trg, src); - } - - @Override // ClientProtocol - public void rename(String src, String dst, Options.Rename... options) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); - } - if (!checkPathLength(dst)) { - throw new IOException("rename: Pathname too long. Limit " - + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); - } - namesystem.renameTo(src, dst, options); - metrics.incrFilesRenamed(); - } - - @Deprecated - @Override // ClientProtocol - public boolean delete(String src) throws IOException { - return delete(src, true); - } - - @Override // ClientProtocol - public boolean delete(String src, boolean recursive) throws IOException { - if (stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* Namenode.delete: src=" + src - + ", recursive=" + recursive); - } - boolean ret = namesystem.delete(src, recursive); - if (ret) - metrics.incrDeleteFileOps(); - return ret; - } - - /** - * Check path length does not exceed maximum. Returns true if - * length and depth are okay. Returns false if length is too long - * or depth is too great. - */ - private boolean checkPathLength(String src) { - Path srcPath = new Path(src); - return (src.length() <= MAX_PATH_LENGTH && - srcPath.depth() <= MAX_PATH_DEPTH); - } - - @Override // ClientProtocol - public boolean mkdirs(String src, FsPermission masked, boolean createParent) - throws IOException { - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src); - } - if (!checkPathLength(src)) { - throw new IOException("mkdirs: Pathname too long. Limit " - + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); - } - return namesystem.mkdirs(src, - new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(), - null, masked), createParent); - } - - @Override // ClientProtocol - public void renewLease(String clientName) throws IOException { - namesystem.renewLease(clientName); - } - - @Override // ClientProtocol - public DirectoryListing getListing(String src, byte[] startAfter, - boolean needLocation) - throws IOException { - DirectoryListing files = namesystem.getListing( - src, startAfter, needLocation); - if (files != null) { - metrics.incrGetListingOps(); - metrics.incrFilesInGetListingOps(files.getPartialListing().length); - } - return files; - } - - @Override // ClientProtocol - public HdfsFileStatus getFileInfo(String src) throws IOException { - metrics.incrFileInfoOps(); - return namesystem.getFileInfo(src, true); - } - - @Override // ClientProtocol - public HdfsFileStatus getFileLinkInfo(String src) throws IOException { - metrics.incrFileInfoOps(); - return namesystem.getFileInfo(src, false); - } - - @Override - public long[] getStats() { - return namesystem.getStats(); - } - - @Override // ClientProtocol - public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) - throws IOException { - DatanodeInfo results[] = namesystem.datanodeReport(type); - if (results == null ) { - throw new IOException("Cannot find datanode report"); - } - return results; - } - - @Override // ClientProtocol - public boolean setSafeMode(SafeModeAction action) throws IOException { - return namesystem.setSafeMode(action); - } - /** * Is the cluster currently in safe mode? */ @@ -1032,257 +512,6 @@ public boolean isInSafeMode() { return namesystem.isInSafeMode(); } - @Override // ClientProtocol - public boolean restoreFailedStorage(String arg) - throws AccessControlException { - return namesystem.restoreFailedStorage(arg); - } - - @Override // ClientProtocol - public void saveNamespace() throws IOException { - namesystem.saveNamespace(); - } - - @Override // ClientProtocol - public void refreshNodes() throws IOException { - namesystem.getBlockManager().getDatanodeManager().refreshNodes( - new HdfsConfiguration()); - } - - @Override // NamenodeProtocol - public long getTransactionID() { - return namesystem.getEditLog().getSyncTxId(); - } - - @Override // NamenodeProtocol - public CheckpointSignature rollEditLog() throws IOException { - return namesystem.rollEditLog(); - } - - @Override - public RemoteEditLogManifest getEditLogManifest(long sinceTxId) - throws IOException { - return namesystem.getEditLog().getEditLogManifest(sinceTxId); - } - - @Override // ClientProtocol - public void finalizeUpgrade() throws IOException { - namesystem.finalizeUpgrade(); - } - - @Override // ClientProtocol - public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) - throws IOException { - return namesystem.distributedUpgradeProgress(action); - } - - @Override // ClientProtocol - public void metaSave(String filename) throws IOException { - namesystem.metaSave(filename); - } - - @Override // ClientProtocol - public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) - throws IOException { - Collection fbs = - namesystem.listCorruptFileBlocks(path, cookie); - - String[] files = new String[fbs.size()]; - String lastCookie = ""; - int i = 0; - for(FSNamesystem.CorruptFileBlockInfo fb: fbs) { - files[i++] = fb.path; - lastCookie = fb.block.getBlockName(); - } - return new CorruptFileBlocks(files, lastCookie); - } - - /** - * Tell all datanodes to use a new, non-persistent bandwidth value for - * dfs.datanode.balance.bandwidthPerSec. - * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes. - * @throws IOException - */ - @Override // ClientProtocol - public void setBalancerBandwidth(long bandwidth) throws IOException { - namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth); - } - - @Override // ClientProtocol - public ContentSummary getContentSummary(String path) throws IOException { - return namesystem.getContentSummary(path); - } - - @Override // ClientProtocol - public void setQuota(String path, long namespaceQuota, long diskspaceQuota) - throws IOException { - namesystem.setQuota(path, namespaceQuota, diskspaceQuota); - } - - @Override // ClientProtocol - public void fsync(String src, String clientName) throws IOException { - namesystem.fsync(src, clientName); - } - - @Override // ClientProtocol - public void setTimes(String src, long mtime, long atime) - throws IOException { - namesystem.setTimes(src, mtime, atime); - } - - @Override // ClientProtocol - public void createSymlink(String target, String link, FsPermission dirPerms, - boolean createParent) throws IOException { - metrics.incrCreateSymlinkOps(); - /* We enforce the MAX_PATH_LENGTH limit even though a symlink target - * URI may refer to a non-HDFS file system. - */ - if (!checkPathLength(link)) { - throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH + - " character limit"); - - } - if ("".equals(target)) { - throw new IOException("Invalid symlink target"); - } - final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - namesystem.createSymlink(target, link, - new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent); - } - - @Override // ClientProtocol - public String getLinkTarget(String path) throws IOException { - metrics.incrGetLinkTargetOps(); - /* Resolves the first symlink in the given path, returning a - * new path consisting of the target of the symlink and any - * remaining path components from the original path. - */ - try { - HdfsFileStatus stat = namesystem.getFileInfo(path, false); - if (stat != null) { - // NB: getSymlink throws IOException if !stat.isSymlink() - return stat.getSymlink(); - } - } catch (UnresolvedPathException e) { - return e.getResolvedPath().toString(); - } catch (UnresolvedLinkException e) { - // The NameNode should only throw an UnresolvedPathException - throw new AssertionError("UnresolvedLinkException thrown"); - } - return null; - } - - - @Override // DatanodeProtocol - public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) - throws IOException { - verifyVersion(nodeReg.getVersion()); - namesystem.registerDatanode(nodeReg); - - return nodeReg; - } - - @Override // DatanodeProtocol - public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xmitsInProgress, int xceiverCount, int failedVolumes) - throws IOException { - verifyRequest(nodeReg); - return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining, - blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes); - } - - @Override // DatanodeProtocol - public DatanodeCommand blockReport(DatanodeRegistration nodeReg, - String poolId, long[] blocks) throws IOException { - verifyRequest(nodeReg); - BlockListAsLongs blist = new BlockListAsLongs(blocks); - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.blockReport: " - + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks() - + " blocks"); - } - - namesystem.getBlockManager().processReport(nodeReg, poolId, blist); - if (getFSImage().isUpgradeFinalized()) - return new DatanodeCommand.Finalize(poolId); - return null; - } - - @Override // DatanodeProtocol - public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, - ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException { - verifyRequest(nodeReg); - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " - +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length - +" blocks."); - } - namesystem.getBlockManager().blockReceivedAndDeleted( - nodeReg, poolId, receivedAndDeletedBlocks); - } - - @Override // DatanodeProtocol - public void errorReport(DatanodeRegistration nodeReg, - int errorCode, String msg) throws IOException { - String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName()); - - if (errorCode == DatanodeProtocol.NOTIFY) { - LOG.info("Error report from " + dnName + ": " + msg); - return; - } - verifyRequest(nodeReg); - - if (errorCode == DatanodeProtocol.DISK_ERROR) { - LOG.warn("Disk error on " + dnName + ": " + msg); - } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) { - LOG.warn("Fatal disk error on " + dnName + ": " + msg); - namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg); - } else { - LOG.info("Error report from " + dnName + ": " + msg); - } - } - - @Override // DatanodeProtocol, NamenodeProtocol - public NamespaceInfo versionRequest() throws IOException { - return namesystem.getNamespaceInfo(); - } - - @Override // DatanodeProtocol - public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException { - return namesystem.processDistributedUpgradeCommand(comm); - } - - /** - * Verify request. - * - * Verifies correctness of the datanode version, registration ID, and - * if the datanode does not need to be shutdown. - * - * @param nodeReg data node registration - * @throws IOException - */ - void verifyRequest(NodeRegistration nodeReg) throws IOException { - verifyVersion(nodeReg.getVersion()); - if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) { - LOG.warn("Invalid registrationID - expected: " - + namesystem.getRegistrationID() + " received: " - + nodeReg.getRegistrationID()); - throw new UnregisteredNodeException(nodeReg); - } - } - - /** - * Verify version. - * - * @param version - * @throws IOException - */ - void verifyVersion(int version) throws IOException { - if (version != HdfsConstants.LAYOUT_VERSION) - throw new IncorrectVersionException(version, "data node"); - } - /** get FSImage */ FSImage getFSImage() { return namesystem.dir.fsImage; @@ -1293,7 +522,7 @@ FSImage getFSImage() { * @return namenode rpc address */ public InetSocketAddress getNameNodeAddress() { - return rpcAddress; + return rpcServer.getRpcAddress(); } /** @@ -1302,7 +531,7 @@ public InetSocketAddress getNameNodeAddress() { * @return namenode service rpc address used by datanodes */ public InetSocketAddress getServiceRpcAddress() { - return serviceRPCAddress != null ? serviceRPCAddress : rpcAddress; + return rpcServer.getServiceRpcAddress() != null ? rpcServer.getServiceRpcAddress() : rpcServer.getRpcAddress(); } /** @@ -1387,40 +616,6 @@ private static boolean finalize(Configuration conf, return false; } - @Override // RefreshAuthorizationPolicyProtocol - public void refreshServiceAcl() throws IOException { - if (!serviceAuthEnabled) { - throw new AuthorizationException("Service Level Authorization not enabled!"); - } - - this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider()); - if (this.serviceRpcServer != null) { - this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider()); - } - } - - @Override // RefreshAuthorizationPolicyProtocol - public void refreshUserToGroupsMappings() throws IOException { - LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + - UserGroupInformation.getCurrentUser().getShortUserName()); - Groups.getUserToGroupsMappingService().refresh(); - } - - @Override // RefreshAuthorizationPolicyProtocol - public void refreshSuperUserGroupsConfiguration() { - LOG.info("Refreshing SuperUser proxy group mapping list "); - - ProxyUsers.refreshSuperUserGroupsConfiguration(); - } - - @Override // GetUserMappingsProtocol - public String[] getGroupsForUser(String user) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Getting groups for user " + user); - } - return UserGroupInformation.createRemoteUser(user).getGroupNames(); - } - private static void printUsage() { System.err.println( "Usage: java NameNode [" + @@ -1593,11 +788,4 @@ public static void main(String argv[]) throws Exception { } } - private static String getClientMachine() { - String clientMachine = Server.getRemoteAddress(); - if (clientMachine == null) { - clientMachine = ""; - } - return clientMachine; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java new file mode 100644 index 0000000000..b0411bdd84 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -0,0 +1,918 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import org.apache.hadoop.hdfs.HDFSPolicyProvider; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; +import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.security.RefreshUserMappingsProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.tools.GetUserMappingsProtocol; + +/** + * This class is responsible for handling all of the RPC calls to the NameNode. + * It is created, started, and stopped by {@link NameNode}. + */ +class NameNodeRpcServer implements NamenodeProtocols { + + private static final Log LOG = NameNode.LOG; + private static final Log stateChangeLog = NameNode.stateChangeLog; + + // Dependencies from other parts of NN. + private final FSNamesystem namesystem; + protected final NameNode nn; + private final NameNodeMetrics metrics; + + private final boolean serviceAuthEnabled; + + /** The RPC server that listens to requests from DataNodes */ + private final RPC.Server serviceRpcServer; + private final InetSocketAddress serviceRPCAddress; + + /** The RPC server that listens to requests from clients */ + protected final RPC.Server server; + protected final InetSocketAddress rpcAddress; + + public NameNodeRpcServer(Configuration conf, NameNode nn) + throws IOException { + this.nn = nn; + this.namesystem = nn.getNamesystem(); + this.metrics = NameNode.getNameNodeMetrics(); + + int handlerCount = + conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, + DFS_DATANODE_HANDLER_COUNT_DEFAULT); + InetSocketAddress socAddr = nn.getRpcServerAddress(conf); + + InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf); + if (dnSocketAddr != null) { + int serviceHandlerCount = + conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, + DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); + this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this, + dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, + false, conf, namesystem.getDelegationTokenSecretManager()); + this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); + nn.setRpcServiceServerAddress(conf, serviceRPCAddress); + } else { + serviceRpcServer = null; + serviceRPCAddress = null; + } + this.server = RPC.getServer(NamenodeProtocols.class, this, + socAddr.getHostName(), socAddr.getPort(), + handlerCount, false, conf, + namesystem.getDelegationTokenSecretManager()); + + // set service-level authorization security policy + if (serviceAuthEnabled = + conf.getBoolean( + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { + this.server.refreshServiceAcl(conf, new HDFSPolicyProvider()); + if (this.serviceRpcServer != null) { + this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); + } + } + + // The rpc-server port can be ephemeral... ensure we have the correct info + this.rpcAddress = this.server.getListenerAddress(); + nn.setRpcServerAddress(conf, rpcAddress); + } + + /** + * Actually start serving requests. + */ + void start() { + server.start(); //start RPC server + if (serviceRpcServer != null) { + serviceRpcServer.start(); + } + } + + /** + * Wait until the RPC server has shut down. + */ + void join() throws InterruptedException { + this.server.join(); + } + + void stop() { + if(server != null) server.stop(); + if(serviceRpcServer != null) serviceRpcServer.stop(); + } + + InetSocketAddress getServiceRpcAddress() { + return serviceRPCAddress; + } + + InetSocketAddress getRpcAddress() { + return rpcAddress; + } + + @Override // VersionedProtocol + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature( + this, protocol, clientVersion, clientMethodsHash); + } + + @Override + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + if (protocol.equals(ClientProtocol.class.getName())) { + return ClientProtocol.versionID; + } else if (protocol.equals(DatanodeProtocol.class.getName())){ + return DatanodeProtocol.versionID; + } else if (protocol.equals(NamenodeProtocol.class.getName())){ + return NamenodeProtocol.versionID; + } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){ + return RefreshAuthorizationPolicyProtocol.versionID; + } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){ + return RefreshUserMappingsProtocol.versionID; + } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){ + return GetUserMappingsProtocol.versionID; + } else { + throw new IOException("Unknown protocol to name node: " + protocol); + } + } + + ///////////////////////////////////////////////////// + // NamenodeProtocol + ///////////////////////////////////////////////////// + @Override // NamenodeProtocol + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + if(size <= 0) { + throw new IllegalArgumentException( + "Unexpected not positive size: "+size); + } + + return namesystem.getBlockManager().getBlocks(datanode, size); + } + + @Override // NamenodeProtocol + public ExportedBlockKeys getBlockKeys() throws IOException { + return namesystem.getBlockManager().getBlockKeys(); + } + + @Override // NamenodeProtocol + public void errorReport(NamenodeRegistration registration, + int errorCode, + String msg) throws IOException { + verifyRequest(registration); + LOG.info("Error report from " + registration + ": " + msg); + if(errorCode == FATAL) + namesystem.releaseBackupNode(registration); + } + + @Override // NamenodeProtocol + public NamenodeRegistration register(NamenodeRegistration registration) + throws IOException { + verifyVersion(registration.getVersion()); + NamenodeRegistration myRegistration = nn.setRegistration(); + namesystem.registerBackupNode(registration, myRegistration); + return myRegistration; + } + + @Override // NamenodeProtocol + public NamenodeCommand startCheckpoint(NamenodeRegistration registration) + throws IOException { + verifyRequest(registration); + if(!nn.isRole(NamenodeRole.NAMENODE)) + throw new IOException("Only an ACTIVE node can invoke startCheckpoint."); + return namesystem.startCheckpoint(registration, nn.setRegistration()); + } + + @Override // NamenodeProtocol + public void endCheckpoint(NamenodeRegistration registration, + CheckpointSignature sig) throws IOException { + verifyRequest(registration); + if(!nn.isRole(NamenodeRole.NAMENODE)) + throw new IOException("Only an ACTIVE node can invoke endCheckpoint."); + namesystem.endCheckpoint(registration, sig); + } + + @Override // ClientProtocol + public Token getDelegationToken(Text renewer) + throws IOException { + return namesystem.getDelegationToken(renewer); + } + + @Override // ClientProtocol + public long renewDelegationToken(Token token) + throws InvalidToken, IOException { + return namesystem.renewDelegationToken(token); + } + + @Override // ClientProtocol + public void cancelDelegationToken(Token token) + throws IOException { + namesystem.cancelDelegationToken(token); + } + + @Override // ClientProtocol + public LocatedBlocks getBlockLocations(String src, + long offset, + long length) + throws IOException { + metrics.incrGetBlockLocations(); + return namesystem.getBlockLocations(getClientMachine(), + src, offset, length); + } + + @Override // ClientProtocol + public FsServerDefaults getServerDefaults() throws IOException { + return namesystem.getServerDefaults(); + } + + @Override // ClientProtocol + public void create(String src, + FsPermission masked, + String clientName, + EnumSetWritable flag, + boolean createParent, + short replication, + long blockSize) throws IOException { + String clientMachine = getClientMachine(); + if (stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*DIR* NameNode.create: file " + +src+" for "+clientName+" at "+clientMachine); + } + if (!checkPathLength(src)) { + throw new IOException("create: Pathname too long. Limit " + + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); + } + namesystem.startFile(src, + new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(), + null, masked), + clientName, clientMachine, flag.get(), createParent, replication, blockSize); + metrics.incrFilesCreated(); + metrics.incrCreateFileOps(); + } + + @Override // ClientProtocol + public LocatedBlock append(String src, String clientName) + throws IOException { + String clientMachine = getClientMachine(); + if (stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*DIR* NameNode.append: file " + +src+" for "+clientName+" at "+clientMachine); + } + LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine); + metrics.incrFilesAppended(); + return info; + } + + @Override // ClientProtocol + public boolean recoverLease(String src, String clientName) throws IOException { + String clientMachine = getClientMachine(); + return namesystem.recoverLease(src, clientName, clientMachine); + } + + @Override // ClientProtocol + public boolean setReplication(String src, short replication) + throws IOException { + return namesystem.setReplication(src, replication); + } + + @Override // ClientProtocol + public void setPermission(String src, FsPermission permissions) + throws IOException { + namesystem.setPermission(src, permissions); + } + + @Override // ClientProtocol + public void setOwner(String src, String username, String groupname) + throws IOException { + namesystem.setOwner(src, username, groupname); + } + + @Override // ClientProtocol + public LocatedBlock addBlock(String src, + String clientName, + ExtendedBlock previous, + DatanodeInfo[] excludedNodes) + throws IOException { + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + +src+" for "+clientName); + } + HashMap excludedNodesSet = null; + if (excludedNodes != null) { + excludedNodesSet = new HashMap(excludedNodes.length); + for (Node node:excludedNodes) { + excludedNodesSet.put(node, node); + } + } + LocatedBlock locatedBlock = + namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet); + if (locatedBlock != null) + metrics.incrAddBlockOps(); + return locatedBlock; + } + + @Override // ClientProtocol + public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk, + final DatanodeInfo[] existings, final DatanodeInfo[] excludes, + final int numAdditionalNodes, final String clientName + ) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("getAdditionalDatanode: src=" + src + + ", blk=" + blk + + ", existings=" + Arrays.asList(existings) + + ", excludes=" + Arrays.asList(excludes) + + ", numAdditionalNodes=" + numAdditionalNodes + + ", clientName=" + clientName); + } + + metrics.incrGetAdditionalDatanodeOps(); + + HashMap excludeSet = null; + if (excludes != null) { + excludeSet = new HashMap(excludes.length); + for (Node node : excludes) { + excludeSet.put(node, node); + } + } + return namesystem.getAdditionalDatanode(src, blk, + existings, excludeSet, numAdditionalNodes, clientName); + } + + /** + * The client needs to give up on the block. + */ + public void abandonBlock(ExtendedBlock b, String src, String holder) + throws IOException { + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: " + +b+" of file "+src); + } + if (!namesystem.abandonBlock(b, src, holder)) { + throw new IOException("Cannot abandon block during write to " + src); + } + } + + @Override // ClientProtocol + public boolean complete(String src, String clientName, ExtendedBlock last) + throws IOException { + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*DIR* NameNode.complete: " + + src + " for " + clientName); + } + return namesystem.completeFile(src, clientName, last); + } + + /** + * The client has detected an error on the specified located blocks + * and is reporting them to the server. For now, the namenode will + * mark the block as corrupt. In the future we might + * check the blocks are actually corrupt. + */ + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + stateChangeLog.info("*DIR* NameNode.reportBadBlocks"); + for (int i = 0; i < blocks.length; i++) { + ExtendedBlock blk = blocks[i].getBlock(); + DatanodeInfo[] nodes = blocks[i].getLocations(); + for (int j = 0; j < nodes.length; j++) { + DatanodeInfo dn = nodes[j]; + namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn); + } + } + } + + @Override // ClientProtocol + public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName) + throws IOException { + return namesystem.updateBlockForPipeline(block, clientName); + } + + + @Override // ClientProtocol + public void updatePipeline(String clientName, ExtendedBlock oldBlock, + ExtendedBlock newBlock, DatanodeID[] newNodes) + throws IOException { + namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes); + } + + @Override // DatanodeProtocol + public void commitBlockSynchronization(ExtendedBlock block, + long newgenerationstamp, long newlength, + boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) + throws IOException { + namesystem.commitBlockSynchronization(block, + newgenerationstamp, newlength, closeFile, deleteblock, newtargets); + } + + @Override // ClientProtocol + public long getPreferredBlockSize(String filename) + throws IOException { + return namesystem.getPreferredBlockSize(filename); + } + + @Deprecated + @Override // ClientProtocol + public boolean rename(String src, String dst) throws IOException { + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); + } + if (!checkPathLength(dst)) { + throw new IOException("rename: Pathname too long. Limit " + + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); + } + boolean ret = namesystem.renameTo(src, dst); + if (ret) { + metrics.incrFilesRenamed(); + } + return ret; + } + + @Override // ClientProtocol + public void concat(String trg, String[] src) throws IOException { + namesystem.concat(trg, src); + } + + @Override // ClientProtocol + public void rename(String src, String dst, Options.Rename... options) + throws IOException { + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); + } + if (!checkPathLength(dst)) { + throw new IOException("rename: Pathname too long. Limit " + + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); + } + namesystem.renameTo(src, dst, options); + metrics.incrFilesRenamed(); + } + + @Deprecated + @Override // ClientProtocol + public boolean delete(String src) throws IOException { + return delete(src, true); + } + + @Override // ClientProtocol + public boolean delete(String src, boolean recursive) throws IOException { + if (stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*DIR* Namenode.delete: src=" + src + + ", recursive=" + recursive); + } + boolean ret = namesystem.delete(src, recursive); + if (ret) + metrics.incrDeleteFileOps(); + return ret; + } + + /** + * Check path length does not exceed maximum. Returns true if + * length and depth are okay. Returns false if length is too long + * or depth is too great. + */ + private boolean checkPathLength(String src) { + Path srcPath = new Path(src); + return (src.length() <= MAX_PATH_LENGTH && + srcPath.depth() <= MAX_PATH_DEPTH); + } + + @Override // ClientProtocol + public boolean mkdirs(String src, FsPermission masked, boolean createParent) + throws IOException { + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src); + } + if (!checkPathLength(src)) { + throw new IOException("mkdirs: Pathname too long. Limit " + + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); + } + return namesystem.mkdirs(src, + new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(), + null, masked), createParent); + } + + @Override // ClientProtocol + public void renewLease(String clientName) throws IOException { + namesystem.renewLease(clientName); + } + + @Override // ClientProtocol + public DirectoryListing getListing(String src, byte[] startAfter, + boolean needLocation) + throws IOException { + DirectoryListing files = namesystem.getListing( + src, startAfter, needLocation); + if (files != null) { + metrics.incrGetListingOps(); + metrics.incrFilesInGetListingOps(files.getPartialListing().length); + } + return files; + } + + @Override // ClientProtocol + public HdfsFileStatus getFileInfo(String src) throws IOException { + metrics.incrFileInfoOps(); + return namesystem.getFileInfo(src, true); + } + + @Override // ClientProtocol + public HdfsFileStatus getFileLinkInfo(String src) throws IOException { + metrics.incrFileInfoOps(); + return namesystem.getFileInfo(src, false); + } + + @Override + public long[] getStats() { + return namesystem.getStats(); + } + + @Override // ClientProtocol + public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) + throws IOException { + DatanodeInfo results[] = namesystem.datanodeReport(type); + if (results == null ) { + throw new IOException("Cannot find datanode report"); + } + return results; + } + + @Override // ClientProtocol + public boolean setSafeMode(SafeModeAction action) throws IOException { + return namesystem.setSafeMode(action); + } + + @Override // ClientProtocol + public boolean restoreFailedStorage(String arg) + throws AccessControlException { + return namesystem.restoreFailedStorage(arg); + } + + @Override // ClientProtocol + public void saveNamespace() throws IOException { + namesystem.saveNamespace(); + } + + @Override // ClientProtocol + public void refreshNodes() throws IOException { + namesystem.getBlockManager().getDatanodeManager().refreshNodes( + new HdfsConfiguration()); + } + + @Override // NamenodeProtocol + public long getTransactionID() { + return namesystem.getEditLog().getSyncTxId(); + } + + @Override // NamenodeProtocol + public CheckpointSignature rollEditLog() throws IOException { + return namesystem.rollEditLog(); + } + + @Override + public RemoteEditLogManifest getEditLogManifest(long sinceTxId) + throws IOException { + return namesystem.getEditLog().getEditLogManifest(sinceTxId); + } + + @Override // ClientProtocol + public void finalizeUpgrade() throws IOException { + namesystem.finalizeUpgrade(); + } + + @Override // ClientProtocol + public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) + throws IOException { + return namesystem.distributedUpgradeProgress(action); + } + + @Override // ClientProtocol + public void metaSave(String filename) throws IOException { + namesystem.metaSave(filename); + } + + @Override // ClientProtocol + public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) + throws IOException { + Collection fbs = + namesystem.listCorruptFileBlocks(path, cookie); + + String[] files = new String[fbs.size()]; + String lastCookie = ""; + int i = 0; + for(FSNamesystem.CorruptFileBlockInfo fb: fbs) { + files[i++] = fb.path; + lastCookie = fb.block.getBlockName(); + } + return new CorruptFileBlocks(files, lastCookie); + } + + /** + * Tell all datanodes to use a new, non-persistent bandwidth value for + * dfs.datanode.balance.bandwidthPerSec. + * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes. + * @throws IOException + */ + @Override // ClientProtocol + public void setBalancerBandwidth(long bandwidth) throws IOException { + namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth); + } + + @Override // ClientProtocol + public ContentSummary getContentSummary(String path) throws IOException { + return namesystem.getContentSummary(path); + } + + @Override // ClientProtocol + public void setQuota(String path, long namespaceQuota, long diskspaceQuota) + throws IOException { + namesystem.setQuota(path, namespaceQuota, diskspaceQuota); + } + + @Override // ClientProtocol + public void fsync(String src, String clientName) throws IOException { + namesystem.fsync(src, clientName); + } + + @Override // ClientProtocol + public void setTimes(String src, long mtime, long atime) + throws IOException { + namesystem.setTimes(src, mtime, atime); + } + + @Override // ClientProtocol + public void createSymlink(String target, String link, FsPermission dirPerms, + boolean createParent) throws IOException { + metrics.incrCreateSymlinkOps(); + /* We enforce the MAX_PATH_LENGTH limit even though a symlink target + * URI may refer to a non-HDFS file system. + */ + if (!checkPathLength(link)) { + throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH + + " character limit"); + + } + if ("".equals(target)) { + throw new IOException("Invalid symlink target"); + } + final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + namesystem.createSymlink(target, link, + new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent); + } + + @Override // ClientProtocol + public String getLinkTarget(String path) throws IOException { + metrics.incrGetLinkTargetOps(); + /* Resolves the first symlink in the given path, returning a + * new path consisting of the target of the symlink and any + * remaining path components from the original path. + */ + try { + HdfsFileStatus stat = namesystem.getFileInfo(path, false); + if (stat != null) { + // NB: getSymlink throws IOException if !stat.isSymlink() + return stat.getSymlink(); + } + } catch (UnresolvedPathException e) { + return e.getResolvedPath().toString(); + } catch (UnresolvedLinkException e) { + // The NameNode should only throw an UnresolvedPathException + throw new AssertionError("UnresolvedLinkException thrown"); + } + return null; + } + + + @Override // DatanodeProtocol + public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) + throws IOException { + verifyVersion(nodeReg.getVersion()); + namesystem.registerDatanode(nodeReg); + + return nodeReg; + } + + @Override // DatanodeProtocol + public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg, + long capacity, long dfsUsed, long remaining, long blockPoolUsed, + int xmitsInProgress, int xceiverCount, int failedVolumes) + throws IOException { + verifyRequest(nodeReg); + return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining, + blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes); + } + + @Override // DatanodeProtocol + public DatanodeCommand blockReport(DatanodeRegistration nodeReg, + String poolId, long[] blocks) throws IOException { + verifyRequest(nodeReg); + BlockListAsLongs blist = new BlockListAsLongs(blocks); + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*BLOCK* NameNode.blockReport: " + + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks() + + " blocks"); + } + + namesystem.getBlockManager().processReport(nodeReg, poolId, blist); + if (nn.getFSImage().isUpgradeFinalized()) + return new DatanodeCommand.Finalize(poolId); + return null; + } + + @Override // DatanodeProtocol + public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, + ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException { + verifyRequest(nodeReg); + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length + +" blocks."); + } + namesystem.getBlockManager().blockReceivedAndDeleted( + nodeReg, poolId, receivedAndDeletedBlocks); + } + + @Override // DatanodeProtocol + public void errorReport(DatanodeRegistration nodeReg, + int errorCode, String msg) throws IOException { + String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName()); + + if (errorCode == DatanodeProtocol.NOTIFY) { + LOG.info("Error report from " + dnName + ": " + msg); + return; + } + verifyRequest(nodeReg); + + if (errorCode == DatanodeProtocol.DISK_ERROR) { + LOG.warn("Disk error on " + dnName + ": " + msg); + } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) { + LOG.warn("Fatal disk error on " + dnName + ": " + msg); + namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg); + } else { + LOG.info("Error report from " + dnName + ": " + msg); + } + } + + @Override // DatanodeProtocol, NamenodeProtocol + public NamespaceInfo versionRequest() throws IOException { + return namesystem.getNamespaceInfo(); + } + + @Override // DatanodeProtocol + public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException { + return namesystem.processDistributedUpgradeCommand(comm); + } + + /** + * Verify request. + * + * Verifies correctness of the datanode version, registration ID, and + * if the datanode does not need to be shutdown. + * + * @param nodeReg data node registration + * @throws IOException + */ + void verifyRequest(NodeRegistration nodeReg) throws IOException { + verifyVersion(nodeReg.getVersion()); + if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID())) { + LOG.warn("Invalid registrationID - expected: " + + namesystem.getRegistrationID() + " received: " + + nodeReg.getRegistrationID()); + throw new UnregisteredNodeException(nodeReg); + } + } + + @Override // RefreshAuthorizationPolicyProtocol + public void refreshServiceAcl() throws IOException { + if (!serviceAuthEnabled) { + throw new AuthorizationException("Service Level Authorization not enabled!"); + } + + this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider()); + if (this.serviceRpcServer != null) { + this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider()); + } + } + + @Override // RefreshAuthorizationPolicyProtocol + public void refreshUserToGroupsMappings() throws IOException { + LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + + UserGroupInformation.getCurrentUser().getShortUserName()); + Groups.getUserToGroupsMappingService().refresh(); + } + + @Override // RefreshAuthorizationPolicyProtocol + public void refreshSuperUserGroupsConfiguration() { + LOG.info("Refreshing SuperUser proxy group mapping list "); + + ProxyUsers.refreshSuperUserGroupsConfiguration(); + } + + @Override // GetUserMappingsProtocol + public String[] getGroupsForUser(String user) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Getting groups for user " + user); + } + return UserGroupInformation.createRemoteUser(user).getGroupNames(); + } + + + /** + * Verify version. + * + * @param version + * @throws IOException + */ + void verifyVersion(int version) throws IOException { + if (version != HdfsConstants.LAYOUT_VERSION) + throw new IncorrectVersionException(version, "data node"); + } + + private static String getClientMachine() { + String clientMachine = Server.getRemoteAddress(); + if (clientMachine == null) { + clientMachine = ""; + } + return clientMachine; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index e74859af18..358d778eaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -173,7 +173,7 @@ public void fsck() { out.println(msg); namenode.getNamesystem().logFsckEvent(path, remoteAddress); - final HdfsFileStatus file = namenode.getFileInfo(path); + final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path); if (file != null) { if (showCorruptFileBlocks) { @@ -250,7 +250,8 @@ private void check(String parent, HdfsFileStatus file, Result res) throws IOExce res.totalDirs++; do { assert lastReturnedName != null; - thisListing = namenode.getListing(path, lastReturnedName, false); + thisListing = namenode.getRpcServer().getListing( + path, lastReturnedName, false); if (thisListing == null) { return; } @@ -385,7 +386,7 @@ private void check(String parent, HdfsFileStatus file, Result res) throws IOExce break; case FIXING_DELETE: if (!isOpen) - namenode.delete(path, true); + namenode.getRpcServer().delete(path, true); } } if (showFiles) { @@ -414,7 +415,8 @@ private void lostFoundMove(String parent, HdfsFileStatus file, LocatedBlocks blo String target = lostFound + fullName; String errmsg = "Failed to move " + fullName + " to /lost+found"; try { - if (!namenode.mkdirs(target, file.getPermission(), true)) { + if (!namenode.getRpcServer().mkdirs( + target, file.getPermission(), true)) { LOG.warn(errmsg); return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java index de94cbeba0..3d2fd8b0be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; @@ -354,7 +355,7 @@ void generateHealthReport(JspWriter out, NameNode nn, } } - static String getDelegationToken(final NameNode nn, + static String getDelegationToken(final NamenodeProtocols nn, HttpServletRequest request, Configuration conf, final UserGroupInformation ugi) throws IOException, InterruptedException { Token token = ugi @@ -381,7 +382,8 @@ static void redirectToRandomDataNode(ServletContext context, .getAttribute(JspHelper.CURRENT_CONF); final DatanodeID datanode = getRandomDatanode(nn); UserGroupInformation ugi = JspHelper.getUGI(context, request, conf); - String tokenString = getDelegationToken(nn, request, conf, ugi); + String tokenString = getDelegationToken( + nn.getRpcServer(), request, conf, ugi); // if the user is defined, get a delegation token and stringify it final String redirectLocation; final String nodeToRedirect; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java index 5e2041cd38..ddd0acbbfb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java @@ -70,7 +70,7 @@ protected void doGet(final HttpServletRequest req, final HttpServletResponse res try { long result = ugi.doAs(new PrivilegedExceptionAction() { public Long run() throws Exception { - return nn.renewDelegationToken(token); + return nn.getRpcServer().renewDelegationToken(token); } }); PrintStream os = new PrintStream(resp.getOutputStream()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 16d2bc2112..e51401cfc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1025,6 +1025,14 @@ public NameNode getNameNode() { return getNameNode(0); } + /** + * Get an instance of the NameNode's RPC handler. + */ + public NamenodeProtocols getNameNodeRpc() { + checkSingleNameNode(); + return getNameNode(0).getRpcServer(); + } + /** * Gets the NameNode for the index. May be null. */ @@ -1361,7 +1369,15 @@ public boolean isNameNodeUp(int nnIndex) { if (nameNode == null) { return false; } - long[] sizes = nameNode.getStats(); + long[] sizes; + try { + sizes = nameNode.getRpcServer().getStats(); + } catch (IOException ioe) { + // This method above should never throw. + // It only throws IOE since it is exposed via RPC + throw new AssertionError("Unexpected IOE thrown: " + + StringUtils.stringifyException(ioe)); + } boolean isUp = false; synchronized (this) { isUp = ((!nameNode.isInSafeMode() || !waitSafeMode) && sizes[0] != 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index a65e6a233f..e7988f99bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.io.IOUtils; @@ -45,7 +46,7 @@ public class TestClientProtocolForPipelineRecovery { try { cluster.waitActive(); FileSystem fileSys = cluster.getFileSystem(); - NameNode namenode = cluster.getNameNode(); + NamenodeProtocols namenode = cluster.getNameNodeRpc(); /* Test writing to finalized replicas */ Path file = new Path("dataprotocol.dat"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index dfea83fdde..9cc1b2999c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Client; @@ -190,7 +191,7 @@ public void testNotYetReplicatedErrors() throws IOException final int maxRetries = 1; // Allow one retry (total of two calls) conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries); - NameNode mockNN = mock(NameNode.class); + NamenodeProtocols mockNN = mock(NamenodeProtocols.class); Answer answer = new ThrowsException(new IOException()) { int retryCount = 0; @@ -240,8 +241,8 @@ public void testFailuresArePerOperation() throws Exception try { cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); - NameNode preSpyNN = cluster.getNameNode(); - NameNode spyNN = spy(preSpyNN); + NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); + NamenodeProtocols spyNN = spy(preSpyNN); DFSClient client = new DFSClient(null, spyNN, conf, null); int maxBlockAcquires = client.getMaxBlockAcquireFailures(); assertTrue(maxBlockAcquires > 0); @@ -305,11 +306,11 @@ public void testFailuresArePerOperation() throws Exception */ private static class FailNTimesAnswer implements Answer { private int failuresLeft; - private NameNode realNN; + private NamenodeProtocols realNN; - public FailNTimesAnswer(NameNode realNN, int timesToFail) { + public FailNTimesAnswer(NamenodeProtocols preSpyNN, int timesToFail) { failuresLeft = timesToFail; - this.realNN = realNN; + this.realNN = preSpyNN; } public LocatedBlocks answer(InvocationOnMock invocation) throws IOException { @@ -603,7 +604,8 @@ public void testGetFileChecksum() throws Exception { //stop the first datanode final List locatedblocks = DFSClient.callGetBlockLocations( - cluster.getNameNode(), f, 0, Long.MAX_VALUE).getLocatedBlocks(); + cluster.getNameNodeRpc(), f, 0, Long.MAX_VALUE) + .getLocatedBlocks(); final DatanodeInfo first = locatedblocks.get(0).getLocations()[0]; cluster.stopDataNode(first.getName()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index 15d7378b9f..3069727a48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -293,10 +293,11 @@ static void refreshNodes(final FSNamesystem ns, final Configuration conf } private void verifyStats(NameNode namenode, FSNamesystem fsn, - DatanodeInfo node, boolean decommissioning) throws InterruptedException { + DatanodeInfo node, boolean decommissioning) + throws InterruptedException, IOException { // Do the stats check over 10 iterations for (int i = 0; i < 10; i++) { - long[] newStats = namenode.getStats(); + long[] newStats = namenode.getRpcServer().getStats(); // For decommissioning nodes, ensure capacity of the DN is no longer // counted. Only used space of the DN is counted in cluster capacity diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java index 4425fcfbf0..1ba56d3844 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java @@ -36,7 +36,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -44,6 +43,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.Before; @@ -151,8 +151,8 @@ public void testRecoverFinalizedBlock() throws Throwable { try { cluster.waitActive(); - NameNode preSpyNN = cluster.getNameNode(); - NameNode spyNN = spy(preSpyNN); + NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); + NamenodeProtocols spyNN = spy(preSpyNN); // Delay completeFile GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG); @@ -222,8 +222,8 @@ public void testCompleteOtherLeaseHoldersFile() throws Throwable { try { cluster.waitActive(); - NameNode preSpyNN = cluster.getNameNode(); - NameNode spyNN = spy(preSpyNN); + NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); + NamenodeProtocols spyNN = spy(preSpyNN); // Delay completeFile GenericTestUtils.DelayAnswer delayer = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 1d7ff4e6e3..642388e42c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -420,7 +420,7 @@ public void testFileCreationError3() throws IOException { final Path f = new Path("/foo.txt"); createFile(dfs, f, 3); try { - cluster.getNameNode().addBlock(f.toString(), + cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName, null, null); fail(); } catch(IOException ioe) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java index 7bcc7d796e..80102582d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java @@ -106,7 +106,7 @@ public void testBlockSynchronization() throws Exception { DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName); - cluster.getNameNode().append(filestr, dfs.dfs.clientName); + cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName); // expire lease to trigger block recovery. waitLeaseRecovery(cluster); @@ -129,14 +129,14 @@ public void testBlockSynchronization() throws Exception { filestr = "/foo.safemode"; filepath = new Path(filestr); dfs.create(filepath, (short)1); - cluster.getNameNode().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); assertTrue(dfs.dfs.exists(filestr)); DFSTestUtil.waitReplication(dfs, filepath, (short)1); waitLeaseRecovery(cluster); // verify that we still cannot recover the lease LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem()); assertTrue("Found " + lm.countLease() + " lease, expected 1", lm.countLease() == 1); - cluster.getNameNode().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); } finally { if (cluster != null) {cluster.shutdown();} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java index 11b7f49d9d..1dc0b1ebd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPipelines.java @@ -100,7 +100,7 @@ public void pipeline_01() throws IOException { ofs.writeBytes("Some more stuff to write"); ((DFSOutputStream) ofs.getWrappedStream()).hflush(); - List lb = cluster.getNameNode().getBlockLocations( + List lb = cluster.getNameNodeRpc().getBlockLocations( filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks(); String bpid = cluster.getNamesystem().getBlockPoolId(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java index 2225449f19..337fa8a17c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage; -import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; /** * This class defines a number of static helper methods used by the @@ -121,7 +121,7 @@ public static void initialize() throws Exception { .manageNameDfsDirs(false) .build(); - NameNode namenode = cluster.getNameNode(); + NamenodeProtocols namenode = cluster.getNameNodeRpc(); namenodeStorageNamespaceID = namenode.versionRequest().getNamespaceID(); namenodeStorageFsscTime = namenode.versionRequest().getCTime(); namenodeStorageClusterID = namenode.versionRequest().getClusterID(); @@ -517,7 +517,7 @@ public static int getCurrentLayoutVersion() { */ public static int getCurrentNamespaceID(MiniDFSCluster cluster) throws IOException { if (cluster != null) { - return cluster.getNameNode().versionRequest().getNamespaceID(); + return cluster.getNameNodeRpc().versionRequest().getNamespaceID(); } return namenodeStorageNamespaceID; } @@ -528,7 +528,7 @@ public static int getCurrentNamespaceID(MiniDFSCluster cluster) throws IOExcepti */ public static String getCurrentClusterID(MiniDFSCluster cluster) throws IOException { if (cluster != null) { - return cluster.getNameNode().versionRequest().getClusterID(); + return cluster.getNameNodeRpc().versionRequest().getClusterID(); } return namenodeStorageClusterID; } @@ -539,7 +539,7 @@ public static String getCurrentClusterID(MiniDFSCluster cluster) throws IOExcept */ public static String getCurrentBlockPoolID(MiniDFSCluster cluster) throws IOException { if (cluster != null) { - return cluster.getNameNode().versionRequest().getBlockPoolID(); + return cluster.getNameNodeRpc().versionRequest().getBlockPoolID(); } return namenodeStorageBlockPoolID; } @@ -554,7 +554,7 @@ public static String getCurrentBlockPoolID(MiniDFSCluster cluster) throws IOExce */ public static long getCurrentFsscTime(MiniDFSCluster cluster) throws IOException { if (cluster != null) { - return cluster.getNameNode().versionRequest().getCTime(); + return cluster.getNameNodeRpc().versionRequest().getCTime(); } return namenodeStorageFsscTime; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index 63dabde88f..9ad87fe087 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -375,11 +375,11 @@ public void testBlockTokenInLastLocatedBlock() throws IOException, Path filePath = new Path(fileName); FSDataOutputStream out = fs.create(filePath, (short) 1); out.write(new byte[1000]); - LocatedBlocks locatedBlocks = cluster.getNameNode().getBlockLocations( + LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations( fileName, 0, 1000); while (locatedBlocks.getLastLocatedBlock() == null) { Thread.sleep(100); - locatedBlocks = cluster.getNameNode().getBlockLocations(fileName, 0, + locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(fileName, 0, 1000); } Token token = locatedBlocks.getLastLocatedBlock() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index 8afedc8bb1..45f41dc7ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -89,7 +89,7 @@ private static class Suite { this.cluster = cluster; clients = new ClientProtocol[nNameNodes]; for(int i = 0; i < nNameNodes; i++) { - clients[i] = cluster.getNameNode(i); + clients[i] = cluster.getNameNode(i).getRpcServer(); } replication = (short)Math.max(1, nDataNodes - 1); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 1eef522f83..d9309edc1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -314,6 +315,7 @@ public void testRead() throws Exception { assertEquals(numDataNodes, cluster.getDataNodes().size()); final NameNode nn = cluster.getNameNode(); + final NamenodeProtocols nnProto = nn.getRpcServer(); final BlockManager bm = nn.getNamesystem().getBlockManager(); final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); @@ -344,7 +346,7 @@ public void testRead() throws Exception { new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); - List locatedBlocks = nn.getBlockLocations( + List locatedBlocks = nnProto.getBlockLocations( FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks(); LocatedBlock lblock = locatedBlocks.get(0); // first block Token myToken = lblock.getBlockToken(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java index cb3ef058a1..5fafc7788e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.test.GenericTestUtils; @@ -139,7 +138,7 @@ public void blockReport_01() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNode().blockReport(dnR, poolId, + cluster.getNameNodeRpc().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); List blocksAfterReport = @@ -181,9 +180,10 @@ public void blockReport_02() throws IOException { List blocks2Remove = new ArrayList(); List removedIndex = new ArrayList(); - List lBlocks = cluster.getNameNode().getBlockLocations( - filePath.toString(), FILE_START, - FILE_SIZE).getLocatedBlocks(); + List lBlocks = + cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), FILE_START, + FILE_SIZE).getLocatedBlocks(); while (removedIndex.size() != 2) { int newRemoveIndex = rand.nextInt(lBlocks.size()); @@ -218,7 +218,7 @@ public void blockReport_02() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNode().blockReport(dnR, poolId, + cluster.getNameNodeRpc().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() @@ -258,7 +258,8 @@ public void blockReport_03() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId, + DatanodeCommand dnCmd = + cluster.getNameNodeRpc().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); if(LOG.isDebugEnabled()) { LOG.debug("Got the command: " + dnCmd); @@ -310,7 +311,7 @@ public void blockReport_06() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNode().blockReport(dnR, poolId, + cluster.getNameNodeRpc().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of PendingReplication Blocks", @@ -359,7 +360,7 @@ public void blockReport_07() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNode().blockReport(dnR, poolId, + cluster.getNameNodeRpc().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of Corrupted blocks", @@ -381,7 +382,7 @@ public void blockReport_07() throws IOException { LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName()); } - cluster.getNameNode().blockReport(dnR, poolId, + cluster.getNameNodeRpc().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); @@ -431,7 +432,7 @@ public void blockReport_08() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNode().blockReport(dnR, poolId, + cluster.getNameNodeRpc().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of PendingReplication blocks", @@ -477,7 +478,7 @@ public void blockReport_09() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNode().blockReport(dnR, poolId, + cluster.getNameNodeRpc().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of PendingReplication blocks", @@ -590,7 +591,7 @@ private ArrayList prepareForRide(final Path filePath, DFSTestUtil.createFile(fs, filePath, fileSize, REPL_FACTOR, rand.nextLong()); - return locatedToBlocks(cluster.getNameNode() + return locatedToBlocks(cluster.getNameNodeRpc() .getBlockLocations(filePath.toString(), FILE_START, fileSize).getLocatedBlocks(), null); } @@ -707,7 +708,8 @@ private void corruptBlockGS(final Block block) private Block findBlock(Path path, long size) throws IOException { Block ret; List lbs = - cluster.getNameNode().getBlockLocations(path.toString(), + cluster.getNameNodeRpc() + .getBlockLocations(path.toString(), FILE_START, size).getLocatedBlocks(); LocatedBlock lb = lbs.get(lbs.size() - 1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 58eac953ad..a541bcb5d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -40,8 +40,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.net.NetUtils; @@ -144,7 +144,7 @@ public void testVolumeFailure() throws IOException { String bpid = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid); long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs(); - cluster.getNameNode().blockReport(dnR, bpid, bReport); + cluster.getNameNodeRpc().blockReport(dnR, bpid, bReport); // verify number of blocks and files... verify(filename, filesize); @@ -216,7 +216,7 @@ private void verify(String fn, int fs) throws IOException{ * @throws IOException */ private void triggerFailure(String path, long size) throws IOException { - NameNode nn = cluster.getNameNode(); + NamenodeProtocols nn = cluster.getNameNodeRpc(); List locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks(); @@ -291,7 +291,7 @@ private int countNNBlocks(Map map, String path, long size) throws IOException { int total = 0; - NameNode nn = cluster.getNameNode(); + NamenodeProtocols nn = cluster.getNameNodeRpc(); List locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks(); //System.out.println("Number of blocks: " + locatedBlocks.size()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java index 17fc27c91a..7237f2a93e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java @@ -109,7 +109,7 @@ public void testTransferRbw() throws Exception { final DatanodeInfo oldnodeinfo; { - final DatanodeInfo[] datatnodeinfos = cluster.getNameNode( + final DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc( ).getDatanodeReport(DatanodeReportType.LIVE); Assert.assertEquals(2, datatnodeinfos.length); int i = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 56c7fbf31a..afc003f938 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -101,6 +102,7 @@ public class NNThroughputBenchmark { static Configuration config; static NameNode nameNode; + static NamenodeProtocols nameNodeProto; NNThroughputBenchmark(Configuration conf) throws IOException, LoginException { config = conf; @@ -120,6 +122,7 @@ public class NNThroughputBenchmark { // Start the NameNode String[] argv = new String[] {}; nameNode = NameNode.createNameNode(argv, config); + nameNodeProto = nameNode.getRpcServer(); } void close() throws IOException { @@ -265,9 +268,9 @@ private boolean isInPorgress() { } void cleanUp() throws IOException { - nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); if(!keepResults) - nameNode.delete(getBaseDir(), true); + nameNodeProto.delete(getBaseDir(), true); } int getNumOpsExecuted() { @@ -398,7 +401,7 @@ public String toString() { void benchmarkOne() throws IOException { for(int idx = 0; idx < opsPerThread; idx++) { if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0) - nameNode.refreshUserToGroupsMappings(); + nameNodeProto.refreshUserToGroupsMappings(); long stat = statsOp.executeOp(daemonId, idx, arg1); localNumOpsExecuted++; localCumulativeTime += stat; @@ -459,9 +462,9 @@ String getExecutionArgument(int daemonId) { */ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { - nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); long start = System.currentTimeMillis(); - nameNode.delete(BASE_DIR_NAME, true); + nameNodeProto.delete(BASE_DIR_NAME, true); long end = System.currentTimeMillis(); return end-start; } @@ -523,7 +526,7 @@ void parseArguments(List args) { void generateInputs(int[] opsPerThread) throws IOException { assert opsPerThread.length == numThreads : "Error opsPerThread.length"; - nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); // int generatedFileIdx = 0; LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName()); fileNames = new String[numThreads][]; @@ -555,12 +558,12 @@ long executeOp(int daemonId, int inputIdx, String clientName) throws IOException { long start = System.currentTimeMillis(); // dummyActionNoSynch(fileIdx); - nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), + nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE); long end = System.currentTimeMillis(); for(boolean written = !closeUponCreate; !written; - written = nameNode.complete(fileNames[daemonId][inputIdx], + written = nameNodeProto.complete(fileNames[daemonId][inputIdx], clientName, null)); return end-start; } @@ -628,11 +631,11 @@ void generateInputs(int[] opsPerThread) throws IOException { } // use the same files for open super.generateInputs(opsPerThread); - if(nameNode.getFileInfo(opCreate.getBaseDir()) != null - && nameNode.getFileInfo(getBaseDir()) == null) { - nameNode.rename(opCreate.getBaseDir(), getBaseDir()); + if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null + && nameNodeProto.getFileInfo(getBaseDir()) == null) { + nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir()); } - if(nameNode.getFileInfo(getBaseDir()) == null) { + if(nameNodeProto.getFileInfo(getBaseDir()) == null) { throw new IOException(getBaseDir() + " does not exist."); } } @@ -643,7 +646,7 @@ void generateInputs(int[] opsPerThread) throws IOException { long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = System.currentTimeMillis(); - nameNode.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE); + nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE); long end = System.currentTimeMillis(); return end-start; } @@ -671,7 +674,7 @@ String getOpName() { long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = System.currentTimeMillis(); - nameNode.delete(fileNames[daemonId][inputIdx], false); + nameNodeProto.delete(fileNames[daemonId][inputIdx], false); long end = System.currentTimeMillis(); return end-start; } @@ -699,7 +702,7 @@ String getOpName() { long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = System.currentTimeMillis(); - nameNode.getFileInfo(fileNames[daemonId][inputIdx]); + nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]); long end = System.currentTimeMillis(); return end-start; } @@ -741,7 +744,7 @@ void generateInputs(int[] opsPerThread) throws IOException { long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = System.currentTimeMillis(); - nameNode.rename(fileNames[daemonId][inputIdx], + nameNodeProto.rename(fileNames[daemonId][inputIdx], destNames[daemonId][inputIdx]); long end = System.currentTimeMillis(); return end-start; @@ -788,11 +791,11 @@ String getName() { void register() throws IOException { // get versions from the namenode - nsInfo = nameNode.versionRequest(); + nsInfo = nameNodeProto.versionRequest(); dnRegistration.setStorageInfo(new DataStorage(nsInfo, "")); DataNode.setNewStorageID(dnRegistration); // register datanode - dnRegistration = nameNode.registerDatanode(dnRegistration); + dnRegistration = nameNodeProto.registerDatanode(dnRegistration); } /** @@ -802,7 +805,7 @@ void register() throws IOException { void sendHeartbeat() throws IOException { // register datanode // TODO:FEDERATION currently a single block pool is supported - DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration, + DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { @@ -847,7 +850,7 @@ public int compareTo(String name) { int replicateBlocks() throws IOException { // register datanode // TODO:FEDERATION currently a single block pool is supported - DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration, + DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0); if (cmds != null) { for (DatanodeCommand cmd : cmds) { @@ -878,7 +881,7 @@ private int transferBlocks( Block blocks[], receivedDNReg.setStorageInfo( new DataStorage(nsInfo, dnInfo.getStorageID())); receivedDNReg.setInfoPort(dnInfo.getInfoPort()); - nameNode.blockReceivedAndDeleted(receivedDNReg, nameNode + nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode .getNamesystem().getBlockPoolId(), new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( blocks[i], DataNode.EMPTY_DEL_HINT) }); @@ -969,14 +972,14 @@ void generateInputs(int[] ignore) throws IOException { FileNameGenerator nameGenerator; nameGenerator = new FileNameGenerator(getBaseDir(), 100); String clientName = getClientName(007); - nameNode.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); for(int idx=0; idx < nrFiles; idx++) { String fileName = nameGenerator.getNextFileName("ThroughputBench"); - nameNode.create(fileName, FsPermission.getDefault(), clientName, + nameNodeProto.create(fileName, FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE); ExtendedBlock lastBlock = addBlocks(fileName, clientName); - nameNode.complete(fileName, clientName, lastBlock); + nameNodeProto.complete(fileName, clientName, lastBlock); } // prepare block reports for(int idx=0; idx < nrDatanodes; idx++) { @@ -988,12 +991,12 @@ private ExtendedBlock addBlocks(String fileName, String clientName) throws IOException { ExtendedBlock prevBlock = null; for(int jdx = 0; jdx < blocksPerFile; jdx++) { - LocatedBlock loc = nameNode.addBlock(fileName, clientName, prevBlock, null); + LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null); prevBlock = loc.getBlock(); for(DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName()); datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); - nameNode.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc + nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc .getBlock().getBlockPoolId(), new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc .getBlock().getLocalBlock(), "") }); @@ -1013,7 +1016,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { assert daemonId < numThreads : "Wrong daemonId."; TinyDatanode dn = datanodes[daemonId]; long start = System.currentTimeMillis(); - nameNode.blockReport(dn.dnRegistration, nameNode.getNamesystem() + nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem() .getBlockPoolId(), dn.getBlockReportList()); long end = System.currentTimeMillis(); return end-start; @@ -1146,7 +1149,7 @@ private void decommissionNodes() throws IOException { LOG.info("Datanode " + dn.getName() + " is decommissioned."); } excludeFile.close(); - nameNode.refreshNodes(); + nameNodeProto.refreshNodes(); } /** @@ -1160,8 +1163,8 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { assert daemonId < numThreads : "Wrong daemonId."; long start = System.currentTimeMillis(); // compute data-node work - int work = BlockManagerTestUtil.getComputedDatanodeWork(nameNode - .getNamesystem().getBlockManager()); + int work = BlockManagerTestUtil.getComputedDatanodeWork( + nameNode.getNamesystem().getBlockManager()); long end = System.currentTimeMillis(); numPendingBlocks += work; if(work == 0) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index afa39dfcf2..ed511eea47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -47,14 +47,6 @@ public static LocatedBlocks getBlockLocations(NameNode namenode, src, offset, length, false, true); } - /** - * Get the internal RPC server instance. - * @return rpc server - */ - public static Server getRpcServer(NameNode namenode) { - return namenode.server; - } - public static DelegationTokenSecretManager getDtSecretManager( final FSNamesystem ns) { return ns.getDelegationTokenSecretManager(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java index 708a048b35..3fca8a3808 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java @@ -239,10 +239,10 @@ public Object run() throws IOException { LOG.info("Innocuous exception", e); } locatedBlocks = DFSClientAdapter.callGetBlockLocations( - cluster.getNameNode(), filePath, 0L, bytes.length); + cluster.getNameNodeRpc(), filePath, 0L, bytes.length); } while (locatedBlocks.isUnderConstruction()); // Force a roll so we get an OP_END_LOG_SEGMENT txn - return cluster.getNameNode().rollEditLog(); + return cluster.getNameNodeRpc().rollEditLog(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java index 1e07e50208..2a27c37fc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -128,12 +129,13 @@ public void testBackupNodeTailsEdits() throws Exception { fileSys = cluster.getFileSystem(); backup = startBackupNode(conf, StartupOption.BACKUP, 1); - BackupImage bnImage = backup.getBNImage(); + BackupImage bnImage = (BackupImage) backup.getFSImage(); testBNInSync(cluster, backup, 1); // Force a roll -- BN should roll with NN. NameNode nn = cluster.getNameNode(); - nn.rollEditLog(); + NamenodeProtocols nnRpc = nn.getRpcServer(); + nnRpc.rollEditLog(); assertEquals(bnImage.getEditLog().getCurSegmentTxId(), nn.getFSImage().getEditLog().getCurSegmentTxId()); @@ -207,7 +209,9 @@ public Boolean get() { LOG.info("Checking for " + src + " on BN"); try { boolean hasFile = backup.getNamesystem().getFileInfo(src, false) != null; - boolean txnIdMatch = backup.getTransactionID() == nn.getTransactionID(); + boolean txnIdMatch = + backup.getRpcServer().getTransactionID() == + nn.getRpcServer().getTransactionID(); return hasFile && txnIdMatch; } catch (Exception e) { throw new RuntimeException(e); @@ -264,7 +268,7 @@ void testCheckpoint(StartupOption op) throws Exception { // // Take a checkpoint // - long txid = cluster.getNameNode().getTransactionID(); + long txid = cluster.getNameNodeRpc().getTransactionID(); backup = startBackupNode(conf, op, 1); waitCheckpointDone(cluster, backup, txid); } catch(IOException e) { @@ -300,18 +304,18 @@ void testCheckpoint(StartupOption op) throws Exception { // Take a checkpoint // backup = startBackupNode(conf, op, 1); - long txid = cluster.getNameNode().getTransactionID(); + long txid = cluster.getNameNodeRpc().getTransactionID(); waitCheckpointDone(cluster, backup, txid); for (int i = 0; i < 10; i++) { fileSys.mkdirs(new Path("file_" + i)); } - txid = cluster.getNameNode().getTransactionID(); + txid = cluster.getNameNodeRpc().getTransactionID(); backup.doCheckpoint(); waitCheckpointDone(cluster, backup, txid); - txid = cluster.getNameNode().getTransactionID(); + txid = cluster.getNameNodeRpc().getTransactionID(); backup.doCheckpoint(); waitCheckpointDone(cluster, backup, txid); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java index 8f1af840c9..66e60b0271 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -152,7 +153,7 @@ public void testBlockCreation() throws IOException { */ @Test public void testGetBlockLocations() throws IOException { - final NameNode namenode = cluster.getNameNode(); + final NamenodeProtocols namenode = cluster.getNameNodeRpc(); final Path p = new Path(BASE_DIR, "file2.dat"); final String src = p.toString(); final FSDataOutputStream out = TestFileCreation.createFile(hdfs, p, 3); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java index 2a1551c6f0..c1f71a4784 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.CheckpointStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.tools.DFSAdmin; @@ -982,11 +983,12 @@ public void testCheckpointSignature() throws IOException { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) .format(true).build(); NameNode nn = cluster.getNameNode(); + NamenodeProtocols nnRpc = nn.getRpcServer(); SecondaryNameNode secondary = startSecondaryNameNode(conf); // prepare checkpoint image secondary.doCheckpoint(); - CheckpointSignature sig = nn.rollEditLog(); + CheckpointSignature sig = nnRpc.rollEditLog(); // manipulate the CheckpointSignature fields sig.setBlockpoolID("somerandomebpid"); sig.clusterID = "somerandomcid"; @@ -1073,8 +1075,10 @@ public void testMultipleSecondaryNamenodes() throws IOException { .nameNodePort(9928).build(); Configuration snConf1 = new HdfsConfiguration(cluster.getConfiguration(0)); Configuration snConf2 = new HdfsConfiguration(cluster.getConfiguration(1)); - InetSocketAddress nn1RpcAddress = cluster.getNameNode(0).rpcAddress; - InetSocketAddress nn2RpcAddress = cluster.getNameNode(1).rpcAddress; + InetSocketAddress nn1RpcAddress = + cluster.getNameNode(0).getNameNodeAddress(); + InetSocketAddress nn2RpcAddress = + cluster.getNameNode(1).getNameNodeAddress(); String nn1 = nn1RpcAddress.getHostName() + ":" + nn1RpcAddress.getPort(); String nn2 = nn2RpcAddress.getHostName() + ":" + nn2RpcAddress.getPort(); @@ -1444,9 +1448,9 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) .format(true).build(); - NameNode nn = cluster.getNameNode(); - String fsName = NameNode.getHostPortString(nn.getHttpAddress()); - + NamenodeProtocols nn = cluster.getNameNodeRpc(); + String fsName = NameNode.getHostPortString( + cluster.getNameNode().getHttpAddress()); // Make a finalized log on the server side. nn.rollEditLog(); @@ -1515,8 +1519,8 @@ public void testCheckpointWithFailedStorageDir() throws Exception { // Now primary NN experiences failure of a volume -- fake by // setting its current dir to a-x permissions - NameNode nn = cluster.getNameNode(); - NNStorage storage = nn.getFSImage().getStorage(); + NamenodeProtocols nn = cluster.getNameNodeRpc(); + NNStorage storage = cluster.getNameNode().getFSImage().getStorage(); StorageDirectory sd0 = storage.getStorageDir(0); StorageDirectory sd1 = storage.getStorageDir(1); @@ -1590,8 +1594,8 @@ public void testCheckpointWithSeparateDirsAfterNameFails() throws Exception { // Now primary NN experiences failure of its only name dir -- fake by // setting its current dir to a-x permissions - NameNode nn = cluster.getNameNode(); - NNStorage storage = nn.getFSImage().getStorage(); + NamenodeProtocols nn = cluster.getNameNodeRpc(); + NNStorage storage = cluster.getNameNode().getFSImage().getStorage(); StorageDirectory sd0 = storage.getStorageDir(0); assertEquals(NameNodeDirType.IMAGE, sd0.getStorageDirType()); currentDir = sd0.getCurrentDir(); @@ -1704,7 +1708,7 @@ public void testSecondaryHasVeryOutOfDateImage() throws IOException { secondary.doCheckpoint(); // Now primary NN saves namespace 3 times - NameNode nn = cluster.getNameNode(); + NamenodeProtocols nn = cluster.getNameNodeRpc(); nn.setSafeMode(SafeModeAction.SAFEMODE_ENTER); for (int i = 0; i < 3; i++) { nn.saveNamespace(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index c6bd56955a..bc33f175bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -103,7 +103,7 @@ public void testDeadDatanode() throws Exception { dn.shutdown(); waitForDatanodeState(reg.getStorageID(), false, 20000); - DatanodeProtocol dnp = cluster.getNameNode(); + DatanodeProtocol dnp = cluster.getNameNodeRpc(); ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo( new Block(0), "") }; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java index 647e47d61d..de55d88467 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSck; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -481,7 +482,7 @@ public void testFsckListCorruptFilesBlocks() throws Exception { } // wait for the namenode to see the corruption - final NameNode namenode = cluster.getNameNode(); + final NamenodeProtocols namenode = cluster.getNameNodeRpc(); CorruptFileBlocks corruptFileBlocks = namenode .listCorruptFileBlocks("/corruptData", null); int numCorrupt = corruptFileBlocks.getFiles().length; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java index ebd4a48ae2..65ec3b4ad2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; @@ -52,7 +53,7 @@ public class TestHDFSConcat { private static final short REPL_FACTOR = 2; private MiniDFSCluster cluster; - private NameNode nn; + private NamenodeProtocols nn; private DistributedFileSystem dfs; private static long blockSize = 512; @@ -72,7 +73,7 @@ public void startUpCluster() throws IOException { cluster.waitClusterUp(); dfs = (DistributedFileSystem) cluster.getFileSystem(); assertNotNull("Failed to get FileSystem", dfs); - nn = cluster.getNameNode(); + nn = cluster.getNameNodeRpc(); assertNotNull("Failed to get NameNode", nn); } @@ -283,7 +284,7 @@ public void testConcatNotCompleteBlock() throws IOException { Path filePath1 = new Path(name1); DFSTestUtil.createFile(dfs, filePath1, trgFileLen, REPL_FACTOR, 1); - HdfsFileStatus fStatus = cluster.getNameNode().getFileInfo(name1); + HdfsFileStatus fStatus = nn.getFileInfo(name1); long fileLen = fStatus.getLen(); assertEquals(fileLen, trgFileLen); @@ -293,11 +294,11 @@ public void testConcatNotCompleteBlock() throws IOException { stm.readFully(0, byteFile1); stm.close(); - LocatedBlocks lb1 = cluster.getNameNode().getBlockLocations(name1, 0, trgFileLen); + LocatedBlocks lb1 = nn.getBlockLocations(name1, 0, trgFileLen); Path filePath2 = new Path(name2); DFSTestUtil.createFile(dfs, filePath2, srcFileLen, REPL_FACTOR, 1); - fStatus = cluster.getNameNode().getFileInfo(name2); + fStatus = nn.getFileInfo(name2); fileLen = fStatus.getLen(); assertEquals(srcFileLen, fileLen); @@ -307,7 +308,7 @@ public void testConcatNotCompleteBlock() throws IOException { stm.readFully(0, byteFile2); stm.close(); - LocatedBlocks lb2 = cluster.getNameNode().getBlockLocations(name2, 0, srcFileLen); + LocatedBlocks lb2 = nn.getBlockLocations(name2, 0, srcFileLen); System.out.println("trg len="+trgFileLen+"; src len="+srcFileLen); @@ -316,7 +317,7 @@ public void testConcatNotCompleteBlock() throws IOException { dfs.concat(filePath1, new Path [] {filePath2}); long totalLen = trgFileLen + srcFileLen; - fStatus = cluster.getNameNode().getFileInfo(name1); + fStatus = nn.getFileInfo(name1); fileLen = fStatus.getLen(); // read the resulting file @@ -325,7 +326,7 @@ public void testConcatNotCompleteBlock() throws IOException { stm.readFully(0, byteFileConcat); stm.close(); - LocatedBlocks lbConcat = cluster.getNameNode().getBlockLocations(name1, 0, fileLen); + LocatedBlocks lbConcat = nn.getBlockLocations(name1, 0, fileLen); //verifications // 1. number of blocks @@ -337,7 +338,7 @@ public void testConcatNotCompleteBlock() throws IOException { assertEquals(fileLen, totalLen); // 3. removal of the src file - fStatus = cluster.getNameNode().getFileInfo(name2); + fStatus = nn.getFileInfo(name2); assertNull("File "+name2+ "still exists", fStatus); // file shouldn't exist // 4. content diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java index 390894cc71..c285c667aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java @@ -147,7 +147,7 @@ public void testListCorruptFileBlocksInSafeMode() throws Exception { conf.setFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY, 0f); cluster = new MiniDFSCluster.Builder(conf).waitSafeMode(false).build(); - cluster.getNameNode(). + cluster.getNameNodeRpc(). setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); FileSystem fs = cluster.getFileSystem(); @@ -244,7 +244,7 @@ public void testListCorruptFileBlocksInSafeMode() throws Exception { cluster.getNameNode().isInSafeMode()); // now leave safe mode so that we can clean up - cluster.getNameNode(). + cluster.getNameNodeRpc(). setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); util.cleanup(fs, "/srcdat10"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java index 2c369d9e4c..aad8d7dc0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java @@ -148,8 +148,8 @@ public void testPurgingWithNameEditsDirAfterFailure() private static void doSaveNamespace(NameNode nn) throws IOException { LOG.info("Saving namespace..."); - nn.setSafeMode(SafeModeAction.SAFEMODE_ENTER); - nn.saveNamespace(); - nn.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + nn.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_ENTER); + nn.getRpcServer().saveNamespace(); + nn.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_LEAVE); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java index 2338f55666..6f2aada534 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Assert; @@ -54,7 +55,7 @@ public void tearDown() throws Exception { @Test public void testDelegationToken() throws IOException, InterruptedException { - NameNode nn = cluster.getNameNode(); + NamenodeProtocols nn = cluster.getNameNodeRpc(); HttpServletRequest request = mock(HttpServletRequest.class); UserGroupInformation ugi = UserGroupInformation.createRemoteUser("auser"); String tokenString = NamenodeJspHelper.getDelegationToken(nn, request, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java index ed7d3fbd03..b62dcc1bd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java @@ -108,7 +108,7 @@ public void testRestartDFS() throws Exception { files.cleanup(fs, dir); files.createFiles(fs, dir); fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER); - cluster.getNameNode().saveNamespace(); + cluster.getNameNodeRpc().saveNamespace(); final String checkAfterModify = checkImages(fsn, numNamenodeDirs); assertFalse("Modified namespace should change fsimage contents. " + "was: " + checkAfterRestart + " now: " + checkAfterModify, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java index 6ba2e81869..8948f7843e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.util.StringUtils; @@ -379,9 +380,10 @@ public void testCompression() throws IOException { NameNode namenode = new NameNode(conf); namenode.getNamesystem().mkdirs("/test", new PermissionStatus("hairong", null, FsPermission.getDefault()), true); - assertTrue(namenode.getFileInfo("/test").isDir()); - namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER); - namenode.saveNamespace(); + NamenodeProtocols nnRpc = namenode.getRpcServer(); + assertTrue(nnRpc.getFileInfo("/test").isDir()); + nnRpc.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + nnRpc.saveNamespace(); namenode.stop(); namenode.join(); @@ -408,9 +410,10 @@ public void testCompression() throws IOException { private void checkNameSpace(Configuration conf) throws IOException { NameNode namenode = new NameNode(conf); - assertTrue(namenode.getFileInfo("/test").isDir()); - namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER); - namenode.saveNamespace(); + NamenodeProtocols nnRpc = namenode.getRpcServer(); + assertTrue(nnRpc.getFileInfo("/test").isDir()); + nnRpc.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + nnRpc.saveNamespace(); namenode.stop(); namenode.join(); } @@ -515,7 +518,7 @@ public void testNNRestart() throws IOException, InterruptedException { cluster.waitActive(); cluster.restartNameNode(); - NameNode nn = cluster.getNameNode(); + NamenodeProtocols nn = cluster.getNameNodeRpc(); assertNotNull(nn); Assert.assertTrue(cluster.isDataNodeUp()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java index 095c153fa2..11152883c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java @@ -339,7 +339,7 @@ public void testMultipleSecondaryCheckpoint() throws IOException { // Simulate a 2NN beginning a checkpoint, but not finishing. This will // cause name1 to be restored. - cluster.getNameNode().rollEditLog(); + cluster.getNameNodeRpc().rollEditLog(); printStorages(fsImage); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java index 94fa7c388a..45b3b02997 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java @@ -74,9 +74,9 @@ public void testFilesInGetListingOps() throws Exception { createFile("/tmp1/t2", 3200, (short)3); createFile("/tmp2/t1", 3200, (short)3); createFile("/tmp2/t2", 3200, (short)3); - cluster.getNameNode().getListing("/tmp1", HdfsFileStatus.EMPTY_NAME, false); + cluster.getNameNodeRpc().getListing("/tmp1", HdfsFileStatus.EMPTY_NAME, false); assertCounter("FilesInGetListingOps", 2L, getMetrics(NN_METRICS)); - cluster.getNameNode().getListing("/tmp2", HdfsFileStatus.EMPTY_NAME, false); + cluster.getNameNodeRpc().getListing("/tmp2", HdfsFileStatus.EMPTY_NAME, false); assertCounter("FilesInGetListingOps", 4L, getMetrics(NN_METRICS)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java index bad53880eb..3c6adc2513 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java @@ -124,8 +124,8 @@ private File initFsimage() throws IOException { } // Write results to the fsimage file - cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER); - cluster.getNameNode().saveNamespace(); + cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER); + cluster.getNameNodeRpc().saveNamespace(); // Determine location of fsimage file orig = FSImageTestUtil.findLatestImageFile(