HDFS-3139. Minor Datanode logging improvement. Contributed by Eli Collins
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1306549 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
aa8cb2287f
commit
0475795066
@ -873,6 +873,8 @@ Release 0.23.1 - 2012-02-17
|
|||||||
|
|
||||||
HDFS-2868. Expose xceiver counts via the DataNode MXBean. (harsh)
|
HDFS-2868. Expose xceiver counts via the DataNode MXBean. (harsh)
|
||||||
|
|
||||||
|
HDFS-3139. Minor Datanode logging improvement. (eli)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-2130. Switch default checksum to CRC32C. (todd)
|
HDFS-2130. Switch default checksum to CRC32C. (todd)
|
||||||
|
@ -28,20 +28,20 @@
|
|||||||
import org.apache.hadoop.io.WritableComparable;
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DatanodeID is composed of the data node
|
* This class represents the primary identifier for a Datanode.
|
||||||
* name (hostname:portNumber) and the data storage ID,
|
* Datanodes are identified by how they can be contacted (hostname
|
||||||
* which it currently represents.
|
* and ports) and their storage ID, a unique number that associates
|
||||||
*
|
* the Datanodes blocks with a particular Datanode.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class DatanodeID implements WritableComparable<DatanodeID> {
|
public class DatanodeID implements WritableComparable<DatanodeID> {
|
||||||
public static final DatanodeID[] EMPTY_ARRAY = {};
|
public static final DatanodeID[] EMPTY_ARRAY = {};
|
||||||
|
|
||||||
public String name; /// hostname:portNumber
|
public String name; // hostname:port (data transfer port)
|
||||||
public String storageID; /// unique per cluster storageID
|
public String storageID; // unique per cluster storageID
|
||||||
protected int infoPort; /// the port where the infoserver is running
|
protected int infoPort; // info server port
|
||||||
public int ipcPort; /// the port where the ipc server is running
|
public int ipcPort; // ipc server port
|
||||||
|
|
||||||
/** Equivalent to DatanodeID(""). */
|
/** Equivalent to DatanodeID(""). */
|
||||||
public DatanodeID() {this("");}
|
public DatanodeID() {this("");}
|
||||||
|
@ -37,9 +37,9 @@
|
|||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DatanodeInfo represents the status of a DataNode.
|
* This class extends the primary identifier of a Datanode with ephemeral
|
||||||
* This object is used for communication in the
|
* state, eg usage information, current administrative state, and the
|
||||||
* Datanode Protocol and the Client Protocol.
|
* network location that is communicated to clients.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
@ -52,12 +52,10 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||||||
protected int xceiverCount;
|
protected int xceiverCount;
|
||||||
protected String location = NetworkTopology.DEFAULT_RACK;
|
protected String location = NetworkTopology.DEFAULT_RACK;
|
||||||
|
|
||||||
/** HostName as supplied by the datanode during registration as its
|
// The FQDN of the IP associated with the Datanode's hostname
|
||||||
* name. Namenode uses datanode IP address as the name.
|
|
||||||
*/
|
|
||||||
protected String hostName = null;
|
protected String hostName = null;
|
||||||
|
|
||||||
// administrative states of a datanode
|
// Datanode administrative states
|
||||||
public enum AdminStates {
|
public enum AdminStates {
|
||||||
NORMAL("In Service"),
|
NORMAL("In Service"),
|
||||||
DECOMMISSION_INPROGRESS("Decommission In Progress"),
|
DECOMMISSION_INPROGRESS("Decommission In Progress"),
|
||||||
@ -241,12 +239,14 @@ public String getDatanodeReport() {
|
|||||||
long nonDFSUsed = getNonDfsUsed();
|
long nonDFSUsed = getNonDfsUsed();
|
||||||
float usedPercent = getDfsUsedPercent();
|
float usedPercent = getDfsUsedPercent();
|
||||||
float remainingPercent = getRemainingPercent();
|
float remainingPercent = getRemainingPercent();
|
||||||
String hostName = NetUtils.getHostNameOfIP(name);
|
String lookupName = NetUtils.getHostNameOfIP(name);
|
||||||
|
|
||||||
buffer.append("Name: "+ name);
|
buffer.append("Name: "+ name);
|
||||||
if(hostName != null)
|
if (lookupName != null) {
|
||||||
buffer.append(" (" + hostName + ")");
|
buffer.append(" (" + lookupName + ")");
|
||||||
|
}
|
||||||
buffer.append("\n");
|
buffer.append("\n");
|
||||||
|
buffer.append("Hostname: " + getHostName() + "\n");
|
||||||
|
|
||||||
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
||||||
buffer.append("Rack: "+location+"\n");
|
buffer.append("Rack: "+location+"\n");
|
||||||
|
@ -34,16 +34,13 @@
|
|||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
|
||||||
/**************************************************
|
/**
|
||||||
* DatanodeDescriptor tracks stats on a given DataNode, such as
|
* This class extends the DatanodeInfo class with ephemeral information (eg
|
||||||
* available storage capacity, last update time, etc., and maintains a
|
* health, capacity, what blocks are associated with the Datanode) that is
|
||||||
* set of blocks stored on the datanode.
|
* private to the Namenode, ie this class is not exposed to clients.
|
||||||
*
|
*/
|
||||||
* This data structure is internal to the namenode. It is *not* sent
|
|
||||||
* over-the-wire to the Client or the Datanodes. Neither is it stored
|
|
||||||
* persistently in the fsImage.
|
|
||||||
**************************************************/
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
public class DatanodeDescriptor extends DatanodeInfo {
|
public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
|
|
||||||
// Stores status of decommissioning.
|
// Stores status of decommissioning.
|
||||||
@ -586,14 +583,14 @@ public void updateRegInfo(DatanodeID nodeReg) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Blanacer bandwidth in bytes per second for this datanode.
|
* @return balancer bandwidth in bytes per second for this datanode
|
||||||
*/
|
*/
|
||||||
public long getBalancerBandwidth() {
|
public long getBalancerBandwidth() {
|
||||||
return this.bandwidth;
|
return this.bandwidth;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
|
* @param bandwidth balancer bandwidth in bytes per second for this datanode
|
||||||
*/
|
*/
|
||||||
public void setBalancerBandwidth(long bandwidth) {
|
public void setBalancerBandwidth(long bandwidth) {
|
||||||
this.bandwidth = bandwidth;
|
this.bandwidth = bandwidth;
|
||||||
|
@ -330,9 +330,7 @@ conf, new AccessControlList(conf.get(DFS_ADMIN, " ")))
|
|||||||
: new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
|
: new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
|
||||||
conf, new AccessControlList(conf.get(DFS_ADMIN, " ")),
|
conf, new AccessControlList(conf.get(DFS_ADMIN, " ")),
|
||||||
secureResources.getListener());
|
secureResources.getListener());
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort);
|
||||||
LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
|
|
||||||
}
|
|
||||||
if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
|
if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
|
||||||
boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
|
boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
|
||||||
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
|
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
|
||||||
@ -398,6 +396,7 @@ private void initIpcServer(Configuration conf) throws IOException {
|
|||||||
.newReflectiveBlockingService(interDatanodeProtocolXlator);
|
.newReflectiveBlockingService(interDatanodeProtocolXlator);
|
||||||
DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
|
DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
|
||||||
ipcServer);
|
ipcServer);
|
||||||
|
LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());
|
||||||
|
|
||||||
// set service-level authorization security policy
|
// set service-level authorization security policy
|
||||||
if (conf.getBoolean(
|
if (conf.getBoolean(
|
||||||
@ -486,14 +485,14 @@ private synchronized void shutdownDirectoryScanner() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void initDataXceiver(Configuration conf) throws IOException {
|
private void initDataXceiver(Configuration conf) throws IOException {
|
||||||
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
|
InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf);
|
||||||
|
|
||||||
// find free port or use privileged port provided
|
// find free port or use privileged port provided
|
||||||
ServerSocket ss;
|
ServerSocket ss;
|
||||||
if(secureResources == null) {
|
if(secureResources == null) {
|
||||||
ss = (dnConf.socketWriteTimeout > 0) ?
|
ss = (dnConf.socketWriteTimeout > 0) ?
|
||||||
ServerSocketChannel.open().socket() : new ServerSocket();
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
||||||
Server.bind(ss, socAddr, 0);
|
Server.bind(ss, streamingAddr, 0);
|
||||||
} else {
|
} else {
|
||||||
ss = secureResources.getStreamingSocket();
|
ss = secureResources.getStreamingSocket();
|
||||||
}
|
}
|
||||||
@ -502,8 +501,7 @@ private void initDataXceiver(Configuration conf) throws IOException {
|
|||||||
int tmpPort = ss.getLocalPort();
|
int tmpPort = ss.getLocalPort();
|
||||||
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
||||||
tmpPort);
|
tmpPort);
|
||||||
LOG.info("Opened info server at " + tmpPort);
|
LOG.info("Opened streaming server at " + selfAddr);
|
||||||
|
|
||||||
this.threadGroup = new ThreadGroup("dataXceiverServer");
|
this.threadGroup = new ThreadGroup("dataXceiverServer");
|
||||||
this.dataXceiverServer = new Daemon(threadGroup,
|
this.dataXceiverServer = new Daemon(threadGroup,
|
||||||
new DataXceiverServer(ss, conf, this));
|
new DataXceiverServer(ss, conf, this));
|
||||||
|
@ -69,18 +69,19 @@ public void init(DaemonContext context) throws Exception {
|
|||||||
args = context.getArguments();
|
args = context.getArguments();
|
||||||
|
|
||||||
// Obtain secure port for data streaming to datanode
|
// Obtain secure port for data streaming to datanode
|
||||||
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
|
InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf);
|
||||||
int socketWriteTimeout = conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
int socketWriteTimeout = conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
||||||
HdfsServerConstants.WRITE_TIMEOUT);
|
HdfsServerConstants.WRITE_TIMEOUT);
|
||||||
|
|
||||||
ServerSocket ss = (socketWriteTimeout > 0) ?
|
ServerSocket ss = (socketWriteTimeout > 0) ?
|
||||||
ServerSocketChannel.open().socket() : new ServerSocket();
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
||||||
ss.bind(socAddr, 0);
|
ss.bind(streamingAddr, 0);
|
||||||
|
|
||||||
// Check that we got the port we need
|
// Check that we got the port we need
|
||||||
if(ss.getLocalPort() != socAddr.getPort())
|
if (ss.getLocalPort() != streamingAddr.getPort()) {
|
||||||
throw new RuntimeException("Unable to bind on specified streaming port in secure " +
|
throw new RuntimeException("Unable to bind on specified streaming port in secure " +
|
||||||
"context. Needed " + socAddr.getPort() + ", got " + ss.getLocalPort());
|
"context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
|
||||||
|
}
|
||||||
|
|
||||||
// Obtain secure listener for web server
|
// Obtain secure listener for web server
|
||||||
SelectChannelConnector listener =
|
SelectChannelConnector listener =
|
||||||
@ -90,15 +91,18 @@ public void init(DaemonContext context) throws Exception {
|
|||||||
listener.setPort(infoSocAddr.getPort());
|
listener.setPort(infoSocAddr.getPort());
|
||||||
// Open listener here in order to bind to port as root
|
// Open listener here in order to bind to port as root
|
||||||
listener.open();
|
listener.open();
|
||||||
if(listener.getPort() != infoSocAddr.getPort())
|
if (listener.getPort() != infoSocAddr.getPort()) {
|
||||||
throw new RuntimeException("Unable to bind on specified info port in secure " +
|
throw new RuntimeException("Unable to bind on specified info port in secure " +
|
||||||
"context. Needed " + socAddr.getPort() + ", got " + ss.getLocalPort());
|
"context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
|
||||||
|
}
|
||||||
System.err.println("Successfully obtained privileged resources (streaming port = "
|
System.err.println("Successfully obtained privileged resources (streaming port = "
|
||||||
+ ss + " ) (http listener port = " + listener.getConnection() +")");
|
+ ss + " ) (http listener port = " + listener.getConnection() +")");
|
||||||
|
|
||||||
if(ss.getLocalPort() >= 1023 || listener.getPort() >= 1023)
|
if (ss.getLocalPort() >= 1023 || listener.getPort() >= 1023) {
|
||||||
throw new RuntimeException("Cannot start secure datanode with unprivileged ports");
|
throw new RuntimeException("Cannot start secure datanode with unprivileged ports");
|
||||||
|
}
|
||||||
|
System.err.println("Opened streaming server at " + streamingAddr);
|
||||||
|
System.err.println("Opened info server at " + infoSocAddr);
|
||||||
resources = new SecureResources(ss, listener);
|
resources = new SecureResources(ss, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ public void testDFSAddressConfig() throws IOException {
|
|||||||
|
|
||||||
String selfSocketAddr = dn.getSelfAddr().toString();
|
String selfSocketAddr = dn.getSelfAddr().toString();
|
||||||
System.out.println("DN Self Socket Addr == " + selfSocketAddr);
|
System.out.println("DN Self Socket Addr == " + selfSocketAddr);
|
||||||
assertTrue(selfSocketAddr.startsWith("/127.0.0.1:"));
|
assertTrue(selfSocketAddr.contains("/127.0.0.1:"));
|
||||||
|
|
||||||
/*-------------------------------------------------------------------------
|
/*-------------------------------------------------------------------------
|
||||||
* Shut down the datanodes, reconfigure, and bring them back up.
|
* Shut down the datanodes, reconfigure, and bring them back up.
|
||||||
@ -78,7 +78,7 @@ public void testDFSAddressConfig() throws IOException {
|
|||||||
selfSocketAddr = dn.getSelfAddr().toString();
|
selfSocketAddr = dn.getSelfAddr().toString();
|
||||||
System.out.println("DN Self Socket Addr == " + selfSocketAddr);
|
System.out.println("DN Self Socket Addr == " + selfSocketAddr);
|
||||||
// assert that default self socket address is 127.0.0.1
|
// assert that default self socket address is 127.0.0.1
|
||||||
assertTrue(selfSocketAddr.startsWith("/127.0.0.1:"));
|
assertTrue(selfSocketAddr.contains("/127.0.0.1:"));
|
||||||
|
|
||||||
/*-------------------------------------------------------------------------
|
/*-------------------------------------------------------------------------
|
||||||
* Shut down the datanodes, reconfigure, and bring them back up.
|
* Shut down the datanodes, reconfigure, and bring them back up.
|
||||||
@ -103,7 +103,7 @@ public void testDFSAddressConfig() throws IOException {
|
|||||||
selfSocketAddr = dn.getSelfAddr().toString();
|
selfSocketAddr = dn.getSelfAddr().toString();
|
||||||
System.out.println("DN Self Socket Addr == " + selfSocketAddr);
|
System.out.println("DN Self Socket Addr == " + selfSocketAddr);
|
||||||
// assert that default self socket address is 0.0.0.0
|
// assert that default self socket address is 0.0.0.0
|
||||||
assertTrue(selfSocketAddr.startsWith("/0.0.0.0:"));
|
assertTrue(selfSocketAddr.contains("/0.0.0.0:"));
|
||||||
|
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user