HDFS-3150. Add option for clients to contact DNs via hostname. Contributed by Eli Collins
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1373094 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f46b6ea55c
commit
f98d8eb291
@ -211,6 +211,8 @@ Branch-2 ( Unreleased changes )
|
|||||||
|
|
||||||
HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm)
|
HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm)
|
||||||
|
|
||||||
|
HDFS-3150. Add option for clients to contact DNs via hostname. (eli)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
|
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
|
||||||
|
@ -86,11 +86,11 @@ protected boolean removeEldestEntry(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private synchronized ClientDatanodeProtocol getDatanodeProxy(
|
private synchronized ClientDatanodeProtocol getDatanodeProxy(
|
||||||
DatanodeInfo node, Configuration conf, int socketTimeout)
|
DatanodeInfo node, Configuration conf, int socketTimeout,
|
||||||
throws IOException {
|
boolean connectToDnViaHostname) throws IOException {
|
||||||
if (proxy == null) {
|
if (proxy == null) {
|
||||||
proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
|
proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
|
||||||
socketTimeout);
|
socketTimeout, connectToDnViaHostname);
|
||||||
}
|
}
|
||||||
return proxy;
|
return proxy;
|
||||||
}
|
}
|
||||||
@ -156,14 +156,16 @@ private void removeBlockLocalPathInfo(ExtendedBlock b) {
|
|||||||
*/
|
*/
|
||||||
static BlockReaderLocal newBlockReader(Configuration conf, String file,
|
static BlockReaderLocal newBlockReader(Configuration conf, String file,
|
||||||
ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
|
ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
|
||||||
int socketTimeout, long startOffset, long length) throws IOException {
|
int socketTimeout, long startOffset, long length,
|
||||||
|
boolean connectToDnViaHostname) throws IOException {
|
||||||
|
|
||||||
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
|
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
|
||||||
.getIpcPort());
|
.getIpcPort());
|
||||||
// check the cache first
|
// check the cache first
|
||||||
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
|
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
|
||||||
if (pathinfo == null) {
|
if (pathinfo == null) {
|
||||||
pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
|
pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
|
||||||
|
connectToDnViaHostname);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check to see if the file exists. It may so happen that the
|
// check to see if the file exists. It may so happen that the
|
||||||
@ -241,11 +243,12 @@ private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
|
|||||||
|
|
||||||
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
|
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
|
||||||
DatanodeInfo node, Configuration conf, int timeout,
|
DatanodeInfo node, Configuration conf, int timeout,
|
||||||
Token<BlockTokenIdentifier> token) throws IOException {
|
Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
|
||||||
|
throws IOException {
|
||||||
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
|
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
|
||||||
BlockLocalPathInfo pathinfo = null;
|
BlockLocalPathInfo pathinfo = null;
|
||||||
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
|
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
|
||||||
conf, timeout);
|
conf, timeout, connectToDnViaHostname);
|
||||||
try {
|
try {
|
||||||
// make RPC to local datanode to find local pathnames of blocks
|
// make RPC to local datanode to find local pathnames of blocks
|
||||||
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
|
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
|
||||||
|
@ -49,6 +49,8 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
@ -213,6 +215,7 @@ static class Conf {
|
|||||||
final String taskId;
|
final String taskId;
|
||||||
final FsPermission uMask;
|
final FsPermission uMask;
|
||||||
final boolean useLegacyBlockReader;
|
final boolean useLegacyBlockReader;
|
||||||
|
final boolean connectToDnViaHostname;
|
||||||
|
|
||||||
Conf(Configuration conf) {
|
Conf(Configuration conf) {
|
||||||
maxFailoverAttempts = conf.getInt(
|
maxFailoverAttempts = conf.getInt(
|
||||||
@ -263,6 +266,8 @@ static class Conf {
|
|||||||
useLegacyBlockReader = conf.getBoolean(
|
useLegacyBlockReader = conf.getBoolean(
|
||||||
DFS_CLIENT_USE_LEGACY_BLOCKREADER,
|
DFS_CLIENT_USE_LEGACY_BLOCKREADER,
|
||||||
DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
|
DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
|
||||||
|
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
||||||
|
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getChecksumType(Configuration conf) {
|
private int getChecksumType(Configuration conf) {
|
||||||
@ -473,6 +478,14 @@ String getClientName() {
|
|||||||
return clientName;
|
return clientName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether the client should use hostnames instead of IPs
|
||||||
|
* when connecting to DataNodes
|
||||||
|
*/
|
||||||
|
boolean connectToDnViaHostname() {
|
||||||
|
return dfsClientConf.connectToDnViaHostname;
|
||||||
|
}
|
||||||
|
|
||||||
void checkOpen() throws IOException {
|
void checkOpen() throws IOException {
|
||||||
if (!clientRunning) {
|
if (!clientRunning) {
|
||||||
IOException result = new IOException("Filesystem closed");
|
IOException result = new IOException("Filesystem closed");
|
||||||
@ -729,12 +742,12 @@ public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
|||||||
*/
|
*/
|
||||||
static BlockReader getLocalBlockReader(Configuration conf,
|
static BlockReader getLocalBlockReader(Configuration conf,
|
||||||
String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
|
String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
|
||||||
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
|
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
|
||||||
throws InvalidToken, IOException {
|
boolean connectToDnViaHostname) throws InvalidToken, IOException {
|
||||||
try {
|
try {
|
||||||
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
|
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
|
||||||
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
|
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
|
||||||
- offsetIntoBlock);
|
- offsetIntoBlock, connectToDnViaHostname);
|
||||||
} catch (RemoteException re) {
|
} catch (RemoteException re) {
|
||||||
throw re.unwrapRemoteException(InvalidToken.class,
|
throw re.unwrapRemoteException(InvalidToken.class,
|
||||||
AccessControlException.class);
|
AccessControlException.class);
|
||||||
@ -1425,7 +1438,8 @@ public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
|
|||||||
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
|
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
return getFileChecksum(src, namenode, socketFactory,
|
return getFileChecksum(src, namenode, socketFactory,
|
||||||
dfsClientConf.socketTimeout, getDataEncryptionKey());
|
dfsClientConf.socketTimeout, getDataEncryptionKey(),
|
||||||
|
dfsClientConf.connectToDnViaHostname);
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@ -1471,7 +1485,8 @@ public DataEncryptionKey getDataEncryptionKey()
|
|||||||
*/
|
*/
|
||||||
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
|
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
|
||||||
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
|
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
|
||||||
DataEncryptionKey encryptionKey) throws IOException {
|
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
|
||||||
|
throws IOException {
|
||||||
//get all block locations
|
//get all block locations
|
||||||
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
|
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
|
||||||
if (null == blockLocations) {
|
if (null == blockLocations) {
|
||||||
@ -1509,9 +1524,11 @@ public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
|
|||||||
try {
|
try {
|
||||||
//connect to a datanode
|
//connect to a datanode
|
||||||
sock = socketFactory.createSocket();
|
sock = socketFactory.createSocket();
|
||||||
NetUtils.connect(sock,
|
String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname);
|
||||||
NetUtils.createSocketAddr(datanodes[j].getXferAddr()),
|
if (LOG.isDebugEnabled()) {
|
||||||
timeout);
|
LOG.debug("Connecting to datanode " + dnAddr);
|
||||||
|
}
|
||||||
|
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
|
||||||
sock.setSoTimeout(timeout);
|
sock.setSoTimeout(timeout);
|
||||||
|
|
||||||
OutputStream unbufOut = NetUtils.getOutputStream(sock);
|
OutputStream unbufOut = NetUtils.getOutputStream(sock);
|
||||||
|
@ -52,6 +52,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
|
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
|
||||||
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
|
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
|
||||||
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
|
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
|
||||||
|
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
|
||||||
|
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
|
||||||
|
|
||||||
// HA related configuration
|
// HA related configuration
|
||||||
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
|
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
|
||||||
@ -81,6 +83,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
|
public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
|
||||||
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
|
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
|
||||||
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
|
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
|
||||||
|
public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
|
||||||
|
public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
|
||||||
|
|
||||||
public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
|
public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
|
||||||
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
|
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
|
||||||
|
@ -199,7 +199,8 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
cdp = DFSUtil.createClientDatanodeProtocolProxy(
|
cdp = DFSUtil.createClientDatanodeProtocolProxy(
|
||||||
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock);
|
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout,
|
||||||
|
dfsClient.getConf().connectToDnViaHostname, locatedblock);
|
||||||
|
|
||||||
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
|
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
|
||||||
|
|
||||||
@ -716,8 +717,12 @@ private DNAddrPair chooseDataNode(LocatedBlock block)
|
|||||||
DatanodeInfo[] nodes = block.getLocations();
|
DatanodeInfo[] nodes = block.getLocations();
|
||||||
try {
|
try {
|
||||||
DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
|
DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
|
||||||
InetSocketAddress targetAddr =
|
final String dnAddr =
|
||||||
NetUtils.createSocketAddr(chosenNode.getXferAddr());
|
chosenNode.getXferAddr(dfsClient.connectToDnViaHostname());
|
||||||
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
|
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
|
||||||
|
}
|
||||||
|
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
||||||
return new DNAddrPair(chosenNode, targetAddr);
|
return new DNAddrPair(chosenNode, targetAddr);
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
String blockInfo = block.getBlock() + " file=" + src;
|
String blockInfo = block.getBlock() + " file=" + src;
|
||||||
@ -875,7 +880,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
|
|||||||
|
|
||||||
if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
|
if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
|
||||||
return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
|
return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
|
||||||
blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset);
|
blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
|
||||||
|
dfsClient.connectToDnViaHostname());
|
||||||
}
|
}
|
||||||
|
|
||||||
IOException err = null;
|
IOException err = null;
|
||||||
@ -1183,7 +1189,7 @@ static DatanodeInfo bestNode(DatanodeInfo nodes[],
|
|||||||
throw new IOException("No live nodes contain current block");
|
throw new IOException("No live nodes contain current block");
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Utility class to encapsulate data node info and its ip address. */
|
/** Utility class to encapsulate data node info and its address. */
|
||||||
static class DNAddrPair {
|
static class DNAddrPair {
|
||||||
DatanodeInfo info;
|
DatanodeInfo info;
|
||||||
InetSocketAddress addr;
|
InetSocketAddress addr;
|
||||||
|
@ -1100,7 +1100,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
|
|||||||
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
|
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
|
||||||
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
||||||
+ "encryption key was invalid when connecting to "
|
+ "encryption key was invalid when connecting to "
|
||||||
+ nodes[0].getXferAddr() + " : " + ie);
|
+ nodes[0] + " : " + ie);
|
||||||
// The encryption key used is invalid.
|
// The encryption key used is invalid.
|
||||||
refetchEncryptionKey--;
|
refetchEncryptionKey--;
|
||||||
dfsClient.clearDataEncryptionKey();
|
dfsClient.clearDataEncryptionKey();
|
||||||
@ -1112,7 +1112,8 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
|
|||||||
// find the datanode that matches
|
// find the datanode that matches
|
||||||
if (firstBadLink.length() != 0) {
|
if (firstBadLink.length() != 0) {
|
||||||
for (int i = 0; i < nodes.length; i++) {
|
for (int i = 0; i < nodes.length; i++) {
|
||||||
if (nodes[i].getXferAddr().equals(firstBadLink)) {
|
// NB: Unconditionally using the xfer addr w/o hostname
|
||||||
|
if (firstBadLink.equals(nodes[i].getXferAddr())) {
|
||||||
errorIndex = i;
|
errorIndex = i;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1216,11 +1217,11 @@ private void setLastException(IOException e) {
|
|||||||
*/
|
*/
|
||||||
static Socket createSocketForPipeline(final DatanodeInfo first,
|
static Socket createSocketForPipeline(final DatanodeInfo first,
|
||||||
final int length, final DFSClient client) throws IOException {
|
final int length, final DFSClient client) throws IOException {
|
||||||
|
final String dnAddr = first.getXferAddr(client.connectToDnViaHostname());
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("Connecting to datanode " + first);
|
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
|
||||||
}
|
}
|
||||||
final InetSocketAddress isa =
|
final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
|
||||||
NetUtils.createSocketAddr(first.getXferAddr());
|
|
||||||
final Socket sock = client.socketFactory.createSocket();
|
final Socket sock = client.socketFactory.createSocket();
|
||||||
final int timeout = client.getDatanodeReadTimeout(length);
|
final int timeout = client.getDatanodeReadTimeout(length);
|
||||||
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout);
|
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout);
|
||||||
|
@ -841,17 +841,17 @@ public static int roundBytesToGB(long bytes) {
|
|||||||
/** Create a {@link ClientDatanodeProtocol} proxy */
|
/** Create a {@link ClientDatanodeProtocol} proxy */
|
||||||
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
||||||
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
||||||
LocatedBlock locatedBlock) throws IOException {
|
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
|
||||||
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
|
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
|
||||||
locatedBlock);
|
connectToDnViaHostname, locatedBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
|
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
|
||||||
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
||||||
DatanodeID datanodeid, Configuration conf, int socketTimeout)
|
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
||||||
throws IOException {
|
boolean connectToDnViaHostname) throws IOException {
|
||||||
return new ClientDatanodeProtocolTranslatorPB(
|
return new ClientDatanodeProtocolTranslatorPB(
|
||||||
datanodeid, conf, socketTimeout);
|
datanodeid, conf, socketTimeout, connectToDnViaHostname);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a {@link ClientDatanodeProtocol} proxy */
|
/** Create a {@link ClientDatanodeProtocol} proxy */
|
||||||
|
@ -104,7 +104,7 @@ public String getXferAddr() {
|
|||||||
/**
|
/**
|
||||||
* @return IP:ipcPort string
|
* @return IP:ipcPort string
|
||||||
*/
|
*/
|
||||||
public String getIpcAddr() {
|
private String getIpcAddr() {
|
||||||
return ipAddr + ":" + ipcPort;
|
return ipAddr + ":" + ipcPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,6 +122,29 @@ public String getXferAddrWithHostname() {
|
|||||||
return hostName + ":" + xferPort;
|
return hostName + ":" + xferPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return hostname:ipcPort
|
||||||
|
*/
|
||||||
|
private String getIpcAddrWithHostname() {
|
||||||
|
return hostName + ":" + ipcPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param useHostname true to use the DN hostname, use the IP otherwise
|
||||||
|
* @return name:xferPort
|
||||||
|
*/
|
||||||
|
public String getXferAddr(boolean useHostname) {
|
||||||
|
return useHostname ? getXferAddrWithHostname() : getXferAddr();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param useHostname true to use the DN hostname, use the IP otherwise
|
||||||
|
* @return name:ipcPort
|
||||||
|
*/
|
||||||
|
public String getIpcAddr(boolean useHostname) {
|
||||||
|
return useHostname ? getIpcAddrWithHostname() : getIpcAddr();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return data storage ID.
|
* @return data storage ID.
|
||||||
*/
|
*/
|
||||||
|
@ -73,10 +73,10 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
|||||||
RefreshNamenodesRequestProto.newBuilder().build();
|
RefreshNamenodesRequestProto.newBuilder().build();
|
||||||
|
|
||||||
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
||||||
Configuration conf, int socketTimeout, LocatedBlock locatedBlock)
|
Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
|
||||||
throws IOException {
|
LocatedBlock locatedBlock) throws IOException {
|
||||||
rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf,
|
rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf,
|
||||||
socketTimeout, locatedBlock);
|
socketTimeout, connectToDnViaHostname, locatedBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
|
public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
|
||||||
@ -90,11 +90,17 @@ public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
|
|||||||
* @param datanodeid Datanode to connect to.
|
* @param datanodeid Datanode to connect to.
|
||||||
* @param conf Configuration.
|
* @param conf Configuration.
|
||||||
* @param socketTimeout Socket timeout to use.
|
* @param socketTimeout Socket timeout to use.
|
||||||
|
* @param connectToDnViaHostname connect to the Datanode using its hostname
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
||||||
Configuration conf, int socketTimeout) throws IOException {
|
Configuration conf, int socketTimeout, boolean connectToDnViaHostname)
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr());
|
throws IOException {
|
||||||
|
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
|
||||||
|
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
|
||||||
|
}
|
||||||
rpcProxy = createClientDatanodeProtocolProxy(addr,
|
rpcProxy = createClientDatanodeProtocolProxy(addr,
|
||||||
UserGroupInformation.getCurrentUser(), conf,
|
UserGroupInformation.getCurrentUser(), conf,
|
||||||
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
||||||
@ -102,10 +108,11 @@ public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
|||||||
|
|
||||||
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
|
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
|
||||||
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
||||||
LocatedBlock locatedBlock) throws IOException {
|
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr());
|
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
|
||||||
|
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("ClientDatanodeProtocol addr=" + addr);
|
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since we're creating a new UserGroupInformation here, we know that no
|
// Since we're creating a new UserGroupInformation here, we know that no
|
||||||
|
@ -55,7 +55,7 @@ class DNConf {
|
|||||||
final boolean dropCacheBehindReads;
|
final boolean dropCacheBehindReads;
|
||||||
final boolean syncOnClose;
|
final boolean syncOnClose;
|
||||||
final boolean encryptDataTransfer;
|
final boolean encryptDataTransfer;
|
||||||
|
final boolean connectToDnViaHostname;
|
||||||
|
|
||||||
final long readaheadLength;
|
final long readaheadLength;
|
||||||
final long heartBeatInterval;
|
final long heartBeatInterval;
|
||||||
@ -97,7 +97,9 @@ public DNConf(Configuration conf) {
|
|||||||
dropCacheBehindReads = conf.getBoolean(
|
dropCacheBehindReads = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
|
||||||
|
connectToDnViaHostname = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
|
||||||
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||||
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
||||||
|
|
||||||
|
@ -276,6 +276,7 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
private final String userWithLocalPathAccess;
|
private final String userWithLocalPathAccess;
|
||||||
|
private boolean connectToDnViaHostname;
|
||||||
ReadaheadPool readaheadPool;
|
ReadaheadPool readaheadPool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -296,8 +297,11 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
final SecureResources resources) throws IOException {
|
final SecureResources resources) throws IOException {
|
||||||
super(conf);
|
super(conf);
|
||||||
|
|
||||||
this.userWithLocalPathAccess = conf
|
this.userWithLocalPathAccess =
|
||||||
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
|
conf.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
|
||||||
|
this.connectToDnViaHostname = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
|
||||||
try {
|
try {
|
||||||
hostName = getHostName(conf);
|
hostName = getHostName(conf);
|
||||||
LOG.info("Configured hostname is " + hostName);
|
LOG.info("Configured hostname is " + hostName);
|
||||||
@ -878,7 +882,7 @@ public String getDisplayName() {
|
|||||||
/**
|
/**
|
||||||
* NB: The datanode can perform data transfer on the streaming
|
* NB: The datanode can perform data transfer on the streaming
|
||||||
* address however clients are given the IPC IP address for data
|
* address however clients are given the IPC IP address for data
|
||||||
* transfer, and that may be be a different address.
|
* transfer, and that may be a different address.
|
||||||
*
|
*
|
||||||
* @return socket address for data transfer
|
* @return socket address for data transfer
|
||||||
*/
|
*/
|
||||||
@ -925,12 +929,12 @@ DatanodeProtocolClientSideTranslatorPB connectToNN(
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
|
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
|
||||||
DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
|
DatanodeID datanodeid, final Configuration conf, final int socketTimeout,
|
||||||
throws IOException {
|
final boolean connectToDnViaHostname) throws IOException {
|
||||||
final InetSocketAddress addr =
|
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
|
||||||
NetUtils.createSocketAddr(datanodeid.getIpcAddr());
|
final InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
|
||||||
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
|
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
|
||||||
}
|
}
|
||||||
final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
|
final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
|
||||||
try {
|
try {
|
||||||
@ -1386,8 +1390,11 @@ public void run() {
|
|||||||
final boolean isClient = clientname.length() > 0;
|
final boolean isClient = clientname.length() > 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
InetSocketAddress curTarget =
|
final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
|
||||||
NetUtils.createSocketAddr(targets[0].getXferAddr());
|
InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Connecting to datanode " + dnAddr);
|
||||||
|
}
|
||||||
sock = newSocket();
|
sock = newSocket();
|
||||||
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
|
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
|
||||||
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
|
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
|
||||||
@ -1843,7 +1850,7 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
|
|||||||
DatanodeRegistration bpReg = bpos.bpRegistration;
|
DatanodeRegistration bpReg = bpos.bpRegistration;
|
||||||
InterDatanodeProtocol datanode = bpReg.equals(id)?
|
InterDatanodeProtocol datanode = bpReg.equals(id)?
|
||||||
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
|
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
|
||||||
dnConf.socketTimeout);
|
dnConf.socketTimeout, dnConf.connectToDnViaHostname);
|
||||||
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
|
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
|
||||||
if (info != null &&
|
if (info != null &&
|
||||||
info.getGenerationStamp() >= block.getGenerationStamp() &&
|
info.getGenerationStamp() >= block.getGenerationStamp() &&
|
||||||
|
@ -86,7 +86,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
private final DNConf dnConf;
|
private final DNConf dnConf;
|
||||||
private final DataXceiverServer dataXceiverServer;
|
private final DataXceiverServer dataXceiverServer;
|
||||||
|
private final boolean connectToDnViaHostname;
|
||||||
private long opStartTime; //the start time of receiving an Op
|
private long opStartTime; //the start time of receiving an Op
|
||||||
private final SocketInputWrapper socketIn;
|
private final SocketInputWrapper socketIn;
|
||||||
private OutputStream socketOut;
|
private OutputStream socketOut;
|
||||||
@ -113,6 +113,7 @@ private DataXceiver(Socket s,
|
|||||||
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
|
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.dataXceiverServer = dataXceiverServer;
|
this.dataXceiverServer = dataXceiverServer;
|
||||||
|
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
|
||||||
remoteAddress = s.getRemoteSocketAddress().toString();
|
remoteAddress = s.getRemoteSocketAddress().toString();
|
||||||
localAddress = s.getLocalSocketAddress().toString();
|
localAddress = s.getLocalSocketAddress().toString();
|
||||||
|
|
||||||
@ -404,7 +405,10 @@ public void writeBlock(final ExtendedBlock block,
|
|||||||
if (targets.length > 0) {
|
if (targets.length > 0) {
|
||||||
InetSocketAddress mirrorTarget = null;
|
InetSocketAddress mirrorTarget = null;
|
||||||
// Connect to backup machine
|
// Connect to backup machine
|
||||||
mirrorNode = targets[0].getXferAddr();
|
mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Connecting to datanode " + mirrorNode);
|
||||||
|
}
|
||||||
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
||||||
mirrorSock = datanode.newSocket();
|
mirrorSock = datanode.newSocket();
|
||||||
try {
|
try {
|
||||||
@ -457,7 +461,8 @@ public void writeBlock(final ExtendedBlock block,
|
|||||||
if (isClient) {
|
if (isClient) {
|
||||||
BlockOpResponseProto.newBuilder()
|
BlockOpResponseProto.newBuilder()
|
||||||
.setStatus(ERROR)
|
.setStatus(ERROR)
|
||||||
.setFirstBadLink(mirrorNode)
|
// NB: Unconditionally using the xfer addr w/o hostname
|
||||||
|
.setFirstBadLink(targets[0].getXferAddr())
|
||||||
.build()
|
.build()
|
||||||
.writeDelimitedTo(replyOut);
|
.writeDelimitedTo(replyOut);
|
||||||
replyOut.flush();
|
replyOut.flush();
|
||||||
@ -729,8 +734,11 @@ public void replaceBlock(final ExtendedBlock block,
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// get the output stream to the proxy
|
// get the output stream to the proxy
|
||||||
InetSocketAddress proxyAddr =
|
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
||||||
NetUtils.createSocketAddr(proxySource.getXferAddr());
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Connecting to datanode " + dnAddr);
|
||||||
|
}
|
||||||
|
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
|
||||||
proxySock = datanode.newSocket();
|
proxySock = datanode.newSocket();
|
||||||
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
|
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
|
||||||
proxySock.setSoTimeout(dnConf.socketTimeout);
|
proxySock.setSoTimeout(dnConf.socketTimeout);
|
||||||
@ -891,6 +899,7 @@ private void checkAccess(DataOutputStream out, final boolean reply,
|
|||||||
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
|
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
|
||||||
DatanodeRegistration dnR =
|
DatanodeRegistration dnR =
|
||||||
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
|
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
|
||||||
|
// NB: Unconditionally using the xfer addr w/o hostname
|
||||||
resp.setFirstBadLink(dnR.getXferAddr());
|
resp.setFirstBadLink(dnR.getXferAddr());
|
||||||
}
|
}
|
||||||
resp.build().writeDelimitedTo(out);
|
resp.build().writeDelimitedTo(out);
|
||||||
|
@ -127,7 +127,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response
|
|||||||
datanode, conf, getUGI(request, conf));
|
datanode, conf, getUGI(request, conf));
|
||||||
final ClientProtocol nnproxy = dfs.getNamenode();
|
final ClientProtocol nnproxy = dfs.getNamenode();
|
||||||
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
|
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
|
||||||
path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey());
|
path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false);
|
||||||
MD5MD5CRC32FileChecksum.write(xml, checksum);
|
MD5MD5CRC32FileChecksum.write(xml, checksum);
|
||||||
} catch(IOException ioe) {
|
} catch(IOException ioe) {
|
||||||
writeXml(ioe, path, xml);
|
writeXml(ioe, path, xml);
|
||||||
|
@ -53,7 +53,7 @@
|
|||||||
<name>dfs.datanode.address</name>
|
<name>dfs.datanode.address</name>
|
||||||
<value>0.0.0.0:50010</value>
|
<value>0.0.0.0:50010</value>
|
||||||
<description>
|
<description>
|
||||||
The address where the datanode server will listen to.
|
The datanode server address and port for data transfer.
|
||||||
If the port is 0 then the server will start on a free port.
|
If the port is 0 then the server will start on a free port.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
@ -920,6 +920,22 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.client.use.datanode.hostname</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>Whether clients should use datanode hostnames when
|
||||||
|
connecting to datanodes.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.use.datanode.hostname</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>Whether datanodes should use datanode hostnames when
|
||||||
|
connecting to other datanodes for data transfer.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.client.local.interfaces</name>
|
<name>dfs.client.local.interfaces</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
|
@ -145,6 +145,7 @@ public static class Builder {
|
|||||||
private boolean setupHostsFile = false;
|
private boolean setupHostsFile = false;
|
||||||
private MiniDFSNNTopology nnTopology = null;
|
private MiniDFSNNTopology nnTopology = null;
|
||||||
private boolean checkExitOnShutdown = true;
|
private boolean checkExitOnShutdown = true;
|
||||||
|
private boolean checkDataNodeHostConfig = false;
|
||||||
|
|
||||||
public Builder(Configuration conf) {
|
public Builder(Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
@ -262,6 +263,14 @@ public Builder checkExitOnShutdown(boolean val) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default: false
|
||||||
|
*/
|
||||||
|
public Builder checkDataNodeHostConfig(boolean val) {
|
||||||
|
this.checkDataNodeHostConfig = val;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default: null
|
* Default: null
|
||||||
*/
|
*/
|
||||||
@ -326,7 +335,8 @@ private MiniDFSCluster(Builder builder) throws IOException {
|
|||||||
builder.waitSafeMode,
|
builder.waitSafeMode,
|
||||||
builder.setupHostsFile,
|
builder.setupHostsFile,
|
||||||
builder.nnTopology,
|
builder.nnTopology,
|
||||||
builder.checkExitOnShutdown);
|
builder.checkExitOnShutdown,
|
||||||
|
builder.checkDataNodeHostConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
public class DataNodeProperties {
|
public class DataNodeProperties {
|
||||||
@ -563,7 +573,7 @@ public MiniDFSCluster(int nameNodePort,
|
|||||||
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
|
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
|
||||||
operation, racks, hosts,
|
operation, racks, hosts,
|
||||||
simulatedCapacities, null, true, false,
|
simulatedCapacities, null, true, false,
|
||||||
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true);
|
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initMiniDFSCluster(
|
private void initMiniDFSCluster(
|
||||||
@ -573,7 +583,8 @@ private void initMiniDFSCluster(
|
|||||||
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
|
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
|
||||||
String[] hosts, long[] simulatedCapacities, String clusterId,
|
String[] hosts, long[] simulatedCapacities, String clusterId,
|
||||||
boolean waitSafeMode, boolean setupHostsFile,
|
boolean waitSafeMode, boolean setupHostsFile,
|
||||||
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown)
|
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
|
||||||
|
boolean checkDataNodeHostConfig)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ExitUtil.disableSystemExit();
|
ExitUtil.disableSystemExit();
|
||||||
|
|
||||||
@ -630,7 +641,7 @@ private void initMiniDFSCluster(
|
|||||||
|
|
||||||
// Start the DataNodes
|
// Start the DataNodes
|
||||||
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
|
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
|
||||||
hosts, simulatedCapacities, setupHostsFile);
|
hosts, simulatedCapacities, setupHostsFile, false, checkDataNodeHostConfig);
|
||||||
waitClusterUp();
|
waitClusterUp();
|
||||||
//make sure ProxyUsers uses the latest conf
|
//make sure ProxyUsers uses the latest conf
|
||||||
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
||||||
@ -982,7 +993,21 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile) throws IOException {
|
boolean setupHostsFile) throws IOException {
|
||||||
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
|
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
|
||||||
simulatedCapacities, setupHostsFile, false);
|
simulatedCapacities, setupHostsFile, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see MiniDFSCluster#startDataNodes(Configuration, int, boolean, StartupOption,
|
||||||
|
* String[], String[], long[], boolean, boolean, boolean)
|
||||||
|
*/
|
||||||
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
|
boolean manageDfsDirs, StartupOption operation,
|
||||||
|
String[] racks, String[] hosts,
|
||||||
|
long[] simulatedCapacities,
|
||||||
|
boolean setupHostsFile,
|
||||||
|
boolean checkDataNodeAddrConfig) throws IOException {
|
||||||
|
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
|
||||||
|
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1008,6 +1033,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
* @param simulatedCapacities array of capacities of the simulated data nodes
|
* @param simulatedCapacities array of capacities of the simulated data nodes
|
||||||
* @param setupHostsFile add new nodes to dfs hosts files
|
* @param setupHostsFile add new nodes to dfs hosts files
|
||||||
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
|
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
|
||||||
|
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
|
||||||
*
|
*
|
||||||
* @throws IllegalStateException if NameNode has been shutdown
|
* @throws IllegalStateException if NameNode has been shutdown
|
||||||
*/
|
*/
|
||||||
@ -1016,11 +1042,16 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
String[] racks, String[] hosts,
|
String[] racks, String[] hosts,
|
||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig) throws IOException {
|
boolean checkDataNodeAddrConfig,
|
||||||
|
boolean checkDataNodeHostConfig) throws IOException {
|
||||||
if (operation == StartupOption.RECOVER) {
|
if (operation == StartupOption.RECOVER) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (checkDataNodeHostConfig) {
|
||||||
|
conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
|
||||||
|
} else {
|
||||||
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
|
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
|
||||||
|
}
|
||||||
|
|
||||||
int curDatanodesNum = dataNodes.size();
|
int curDatanodesNum = dataNodes.size();
|
||||||
// for mincluster's the default initialDelay for BRs is 0
|
// for mincluster's the default initialDelay for BRs is 0
|
||||||
|
@ -768,7 +768,7 @@ public void testClientDNProtocolTimeout() throws IOException {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
proxy = DFSUtil.createClientDatanodeProtocolProxy(
|
proxy = DFSUtil.createClientDatanodeProtocolProxy(
|
||||||
fakeDnId, conf, 500, fakeBlock);
|
fakeDnId, conf, 500, false, fakeBlock);
|
||||||
|
|
||||||
proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1));
|
proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1));
|
||||||
fail ("Did not get expected exception: SocketTimeoutException");
|
fail ("Did not get expected exception: SocketTimeoutException");
|
||||||
|
@ -417,7 +417,6 @@ public void testFileChecksum() throws Exception {
|
|||||||
|
|
||||||
final Configuration conf = getTestConfiguration();
|
final Configuration conf = getTestConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
|
|
||||||
|
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
final FileSystem hdfs = cluster.getFileSystem();
|
final FileSystem hdfs = cluster.getFileSystem();
|
||||||
|
@ -171,7 +171,14 @@ public void testServerDefaults() throws IOException {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileCreation() throws IOException {
|
public void testFileCreation() throws IOException {
|
||||||
checkFileCreation(null);
|
checkFileCreation(null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Same test but the client should use DN hostnames */
|
||||||
|
@Test
|
||||||
|
public void testFileCreationUsingHostname() throws IOException {
|
||||||
|
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
|
||||||
|
checkFileCreation(null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Same test but the client should bind to a local interface */
|
/** Same test but the client should bind to a local interface */
|
||||||
@ -180,10 +187,10 @@ public void testFileCreationSetLocalInterface() throws IOException {
|
|||||||
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
|
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
|
||||||
|
|
||||||
// The mini cluster listens on the loopback so we can use it here
|
// The mini cluster listens on the loopback so we can use it here
|
||||||
checkFileCreation("lo");
|
checkFileCreation("lo", false);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
checkFileCreation("bogus-interface");
|
checkFileCreation("bogus-interface", false);
|
||||||
fail("Able to specify a bogus interface");
|
fail("Able to specify a bogus interface");
|
||||||
} catch (UnknownHostException e) {
|
} catch (UnknownHostException e) {
|
||||||
assertEquals("No such interface bogus-interface", e.getMessage());
|
assertEquals("No such interface bogus-interface", e.getMessage());
|
||||||
@ -193,16 +200,28 @@ public void testFileCreationSetLocalInterface() throws IOException {
|
|||||||
/**
|
/**
|
||||||
* Test if file creation and disk space consumption works right
|
* Test if file creation and disk space consumption works right
|
||||||
* @param netIf the local interface, if any, clients should use to access DNs
|
* @param netIf the local interface, if any, clients should use to access DNs
|
||||||
|
* @param useDnHostname whether the client should contact DNs by hostname
|
||||||
*/
|
*/
|
||||||
public void checkFileCreation(String netIf) throws IOException {
|
public void checkFileCreation(String netIf, boolean useDnHostname)
|
||||||
|
throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
if (netIf != null) {
|
if (netIf != null) {
|
||||||
conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf);
|
conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf);
|
||||||
}
|
}
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, useDnHostname);
|
||||||
|
if (useDnHostname) {
|
||||||
|
// Since the mini cluster only listens on the loopback we have to
|
||||||
|
// ensure the hostname used to access DNs maps to the loopback. We
|
||||||
|
// do this by telling the DN to advertise localhost as its hostname
|
||||||
|
// instead of the default hostname.
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
|
||||||
|
}
|
||||||
if (simulatedStorage) {
|
if (simulatedStorage) {
|
||||||
SimulatedFSDataset.setFactory(conf);
|
SimulatedFSDataset.setFactory(conf);
|
||||||
}
|
}
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.checkDataNodeHostConfig(true)
|
||||||
|
.build();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -92,7 +92,6 @@ public static void setUp() throws IOException {
|
|||||||
RAN.setSeed(seed);
|
RAN.setSeed(seed);
|
||||||
|
|
||||||
config = new Configuration();
|
config = new Configuration();
|
||||||
config.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
|
|
||||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
|
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
|
||||||
hdfs = cluster.getFileSystem();
|
hdfs = cluster.getFileSystem();
|
||||||
blockPoolId = cluster.getNamesystem().getBlockPoolId();
|
blockPoolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
@ -41,6 +42,7 @@ public class TestMiniDFSCluster {
|
|||||||
private static final String CLUSTER_2 = "cluster2";
|
private static final String CLUSTER_2 = "cluster2";
|
||||||
private static final String CLUSTER_3 = "cluster3";
|
private static final String CLUSTER_3 = "cluster3";
|
||||||
private static final String CLUSTER_4 = "cluster4";
|
private static final String CLUSTER_4 = "cluster4";
|
||||||
|
private static final String CLUSTER_5 = "cluster5";
|
||||||
protected String testDataPath;
|
protected String testDataPath;
|
||||||
protected File testDataDir;
|
protected File testDataDir;
|
||||||
@Before
|
@Before
|
||||||
@ -125,4 +127,25 @@ public void testIsClusterUpAfterShutdown() throws Throwable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** MiniDFSCluster should not clobber dfs.datanode.hostname if requested */
|
||||||
|
@Test(timeout=100000)
|
||||||
|
public void testClusterSetDatanodeHostname() throws Throwable {
|
||||||
|
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "MYHOST");
|
||||||
|
File testDataCluster5 = new File(testDataPath, CLUSTER_5);
|
||||||
|
String c5Path = testDataCluster5.getAbsolutePath();
|
||||||
|
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, c5Path);
|
||||||
|
MiniDFSCluster cluster5 = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(1)
|
||||||
|
.checkDataNodeHostConfig(true)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
assertEquals("DataNode hostname config not respected", "MYHOST",
|
||||||
|
cluster5.getDataNodes().get(0).getDatanodeId().getHostName());
|
||||||
|
} finally {
|
||||||
|
MiniDFSCluster.shutdownCluster(cluster5);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -246,7 +246,7 @@ public void testGetBlockLocalPathInfo() throws IOException, InterruptedException
|
|||||||
@Override
|
@Override
|
||||||
public ClientDatanodeProtocol run() throws Exception {
|
public ClientDatanodeProtocol run() throws Exception {
|
||||||
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
|
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
|
||||||
60000);
|
60000, false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -264,7 +264,7 @@ public ClientDatanodeProtocol run() throws Exception {
|
|||||||
@Override
|
@Override
|
||||||
public ClientDatanodeProtocol run() throws Exception {
|
public ClientDatanodeProtocol run() throws Exception {
|
||||||
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
|
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
|
||||||
60000);
|
60000, false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
try {
|
try {
|
||||||
|
@ -304,7 +304,7 @@ public void testBlockTokenRpcLeak() throws Exception {
|
|||||||
long endTime = Time.now() + 3000;
|
long endTime = Time.now() + 3000;
|
||||||
while (Time.now() < endTime) {
|
while (Time.now() < endTime) {
|
||||||
proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000,
|
proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000,
|
||||||
fakeBlock);
|
false, fakeBlock);
|
||||||
assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
|
assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
|
||||||
if (proxy != null) {
|
if (proxy != null) {
|
||||||
RPC.stopProxy(proxy);
|
RPC.stopProxy(proxy);
|
||||||
|
@ -105,10 +105,13 @@ public static DatanodeProtocolClientSideTranslatorPB spyOnBposToNN(
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
|
public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
|
||||||
DataNode dn, DatanodeID datanodeid, final Configuration conf
|
DataNode dn, DatanodeID datanodeid, final Configuration conf,
|
||||||
) throws IOException {
|
boolean connectToDnViaHostname) throws IOException {
|
||||||
|
if (connectToDnViaHostname != dn.getDnConf().connectToDnViaHostname) {
|
||||||
|
throw new AssertionError("Unexpected DN hostname configuration");
|
||||||
|
}
|
||||||
return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
|
return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
|
||||||
dn.getDnConf().socketTimeout);
|
dn.getDnConf().socketTimeout, dn.getDnConf().connectToDnViaHostname);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void shutdownBlockScanner(DataNode dn) {
|
public static void shutdownBlockScanner(DataNode dn) {
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClientAdapter;
|
import org.apache.hadoop.hdfs.DFSClientAdapter;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
@ -59,6 +60,8 @@
|
|||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This tests InterDataNodeProtocol for block handling.
|
* This tests InterDataNodeProtocol for block handling.
|
||||||
*/
|
*/
|
||||||
@ -125,17 +128,42 @@ public static LocatedBlock getLastLocatedBlock(
|
|||||||
return blocks.get(blocks.size() - 1);
|
return blocks.get(blocks.size() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Test block MD access via a DN */
|
||||||
|
@Test
|
||||||
|
public void testBlockMetaDataInfo() throws Exception {
|
||||||
|
checkBlockMetaDataInfo(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** The same as above, but use hostnames for DN<->DN communication */
|
||||||
|
@Test
|
||||||
|
public void testBlockMetaDataInfoWithHostname() throws Exception {
|
||||||
|
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
|
||||||
|
checkBlockMetaDataInfo(true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The following test first creates a file.
|
* The following test first creates a file.
|
||||||
* It verifies the block information from a datanode.
|
* It verifies the block information from a datanode.
|
||||||
* Then, it updates the block with new information and verifies again.
|
* Then, it updates the block with new information and verifies again.
|
||||||
|
* @param useDnHostname whether DNs should connect to other DNs by hostname
|
||||||
*/
|
*/
|
||||||
@Test
|
private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
|
||||||
public void testBlockMetaDataInfo() throws Exception {
|
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, useDnHostname);
|
||||||
|
if (useDnHostname) {
|
||||||
|
// Since the mini cluster only listens on the loopback we have to
|
||||||
|
// ensure the hostname used to access DNs maps to the loopback. We
|
||||||
|
// do this by telling the DN to advertise localhost as its hostname
|
||||||
|
// instead of the default hostname.
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(3)
|
||||||
|
.checkDataNodeHostConfig(true)
|
||||||
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
//create a file
|
//create a file
|
||||||
@ -154,7 +182,7 @@ public void testBlockMetaDataInfo() throws Exception {
|
|||||||
//connect to a data node
|
//connect to a data node
|
||||||
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
|
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
|
||||||
InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
|
InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
|
||||||
datanode, datanodeinfo[0], conf);
|
datanode, datanodeinfo[0], conf, useDnHostname);
|
||||||
|
|
||||||
//stop block scanner, so we could compare lastScanTime
|
//stop block scanner, so we could compare lastScanTime
|
||||||
DataNodeTestUtils.shutdownBlockScanner(datanode);
|
DataNodeTestUtils.shutdownBlockScanner(datanode);
|
||||||
@ -364,7 +392,7 @@ public void testInterDNProtocolTimeout() throws Throwable {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
proxy = DataNode.createInterDataNodeProtocolProxy(
|
proxy = DataNode.createInterDataNodeProtocolProxy(
|
||||||
dInfo, conf, 500);
|
dInfo, conf, 500, false);
|
||||||
proxy.initReplicaRecovery(new RecoveringBlock(
|
proxy.initReplicaRecovery(new RecoveringBlock(
|
||||||
new ExtendedBlock("bpid", 1), null, 100));
|
new ExtendedBlock("bpid", 1), null, 100));
|
||||||
fail ("Expected SocketTimeoutException exception, but did not get.");
|
fail ("Expected SocketTimeoutException exception, but did not get.");
|
||||||
|
Loading…
Reference in New Issue
Block a user