HDFS-5585. Provide admin commands for data node upgrade. Contributed by Kihwal Lee.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1568523 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2014-02-14 21:20:02 +00:00
parent a795bc42d0
commit 1a68f13521
10 changed files with 262 additions and 0 deletions

View File

@ -39,3 +39,4 @@ HDFS-5535 subtasks:
HDFS-5494. Merge Protobuf-based-FSImage code from trunk - fix build
break after the merge. (Jing Zhao via Arpit Agarwal)
HDFS-5585. Provide admin commands for data node upgrade (kihwal)

View File

@ -124,4 +124,22 @@ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
*/
HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
List<Token<BlockTokenIdentifier>> tokens) throws IOException;
/**
* Shuts down a datanode.
*
* @param forUpgrade If true, data node does extra prep work before shutting
* down. The work includes advising clients to wait and saving
* certain states for quick restart. This should only be used when
* the stored data will remain the same during upgrade/restart.
* @throws IOException
*/
void shutdownDatanode(boolean forUpgrade) throws IOException;
/**
* Obtains datanode info
*
* @return software/config version and uptime of the datanode
*/
DatanodeLocalInfo getDatanodeInfo() throws IOException;
}

View File

@ -30,6 +30,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
@ -37,10 +39,13 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
@ -58,6 +63,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
RefreshNamenodesResponseProto.newBuilder().build();
private final static DeleteBlockPoolResponseProto DELETE_BLOCKPOOL_RESP =
DeleteBlockPoolResponseProto.newBuilder().build();
private final static ShutdownDatanodeResponseProto SHUTDOWN_DATANODE_RESP =
ShutdownDatanodeResponseProto.newBuilder().build();
private final ClientDatanodeProtocol impl;
@ -152,4 +159,28 @@ public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations(
builder.addAllVolumeIndexes(resp.getVolumeIndexes());
return builder.build();
}
@Override
public ShutdownDatanodeResponseProto shutdownDatanode(
RpcController unused, ShutdownDatanodeRequestProto request)
throws ServiceException {
try {
impl.shutdownDatanode(request.getForUpgrade());
} catch (IOException e) {
throw new ServiceException(e);
}
return SHUTDOWN_DATANODE_RESP;
}
public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused,
GetDatanodeInfoRequestProto request) throws ServiceException {
GetDatanodeInfoResponseProto res;
try {
res = GetDatanodeInfoResponseProto.newBuilder()
.setLocalInfo(PBHelper.convert(impl.getDatanodeInfo())).build();
} catch (IOException e) {
throw new ServiceException(e);
}
return res;
}
}

View File

@ -34,16 +34,20 @@
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
@ -79,6 +83,8 @@ public class ClientDatanodeProtocolTranslatorPB implements
private final ClientDatanodeProtocolPB rpcProxy;
private final static RefreshNamenodesRequestProto VOID_REFRESH_NAMENODES =
RefreshNamenodesRequestProto.newBuilder().build();
private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO =
GetDatanodeInfoRequestProto.newBuilder().build();
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
@ -255,4 +261,27 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}),
volumeIds, volumeIndexes);
}
@Override
public void shutdownDatanode(boolean forUpgrade) throws IOException {
ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto
.newBuilder().setForUpgrade(forUpgrade).build();
try {
rpcProxy.shutdownDatanode(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public DatanodeLocalInfo getDatanodeInfo() throws IOException {
GetDatanodeInfoResponseProto response;
try {
response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER, VOID_GET_DATANODE_INFO);
return PBHelper.convert(response.getLocalInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@ -107,6 +108,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto.AdminState;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
@ -1935,6 +1937,19 @@ public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) {
DatanodeLocalInfoProto.Builder builder = DatanodeLocalInfoProto.newBuilder();
builder.setSoftwareVersion(info.getSoftwareVersion());
builder.setConfigVersion(info.getConfigVersion());
builder.setUptime(info.getUptime());
return builder.build();
}
public static DatanodeLocalInfo convert(DatanodeLocalInfoProto proto) {
return new DatanodeLocalInfo(proto.getSoftwareVersion(),
proto.getConfigVersion(), proto.getUptime());
}
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();

View File

@ -29,6 +29,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
@ -73,6 +74,7 @@
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -272,6 +274,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private SecureResources secureResources = null;
private List<StorageLocation> dataDirs;
private Configuration conf;
private String confVersion;
private final long maxNumberOfBlocksToLog;
private final List<String> usersWithLocalPathAccess;
@ -300,6 +303,11 @@ public static InetSocketAddress createSocketAddr(String target) {
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
confVersion = "core-" +
conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
",hdfs-" +
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
// Determine whether we should try to pass file descriptors to clients.
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
@ -2472,6 +2480,33 @@ public void deleteBlockPool(String blockPoolId, boolean force)
data.deleteBlockPool(blockPoolId, force);
}
@Override // ClientDatanodeProtocol
public void shutdownDatanode(boolean forUpgrade) throws IOException {
LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade +
"). Shutting down Datanode...");
// Delay start the shutdown process so that the rpc response can be
// sent back.
Thread shutdownThread = new Thread() {
@Override public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) { }
shutdown();
}
};
shutdownThread.setDaemon(true);
shutdownThread.start();
}
@Override //ClientDatanodeProtocol
public DatanodeLocalInfo getDatanodeInfo() {
long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000;
return new DatanodeLocalInfo(VersionInfo.getVersion(),
confVersion, uptime);
}
/**
* @param addr rpc address of the namenode
* @return true if the datanode is connected to a NameNode at the

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@ -645,6 +646,8 @@ private void printHelp(String cmd) {
"\t[-fetchImage <local directory>]\n" +
"\t[-allowSnapshot <snapshotDir>]\n" +
"\t[-disallowSnapshot <snapshotDir>]\n" +
"\t[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]\n" +
"\t[-getDatanodeInfo <datanode_host:ipc_port>\n" +
"\t[-help [cmd]]\n";
String report ="-report: \tReports basic filesystem information and statistics.\n";
@ -741,6 +744,18 @@ private void printHelp(String cmd) {
String disallowSnapshot = "-disallowSnapshot <snapshotDir>:\n" +
"\tDo not allow snapshots to be taken on a directory any more.\n";
String shutdownDatanode = "-shutdownDatanode <datanode_host:ipc_port> [upgrade]\n" +
"\tShut down the datanode. If an optional argument \"upgrade\" is\n" +
"\tpassed, the clients will be advised to wait for the datanode to\n" +
"\trestart and the fast start-up mode will be enabled. Clients will\n" +
"\ttimeout and ignore the datanode, if the restart does not happen\n" +
"\tin time. The fast start-up mode will also be disabled, if restart\n" +
"\tis delayed too much.\n";
String getDatanodeInfo = "-getDatanodeInfo <datanode_host:ipc_port>\n" +
"\tCheck the datanode for liveness. If the datanode responds,\n" +
"\timore information about the datanode is printed.\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
@ -791,6 +806,10 @@ private void printHelp(String cmd) {
System.out.println(allowSnapshot);
} else if ("disallowSnapshot".equalsIgnoreCase(cmd)) {
System.out.println(disallowSnapshot);
} else if ("shutdownDatanode".equalsIgnoreCase(cmd)) {
System.out.println(shutdownDatanode);
} else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
System.out.println(getDatanodeInfo);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
@ -818,6 +837,8 @@ private void printHelp(String cmd) {
System.out.println(fetchImage);
System.out.println(allowSnapshot);
System.out.println(disallowSnapshot);
System.out.println(shutdownDatanode);
System.out.println(getDatanodeInfo);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
@ -1100,6 +1121,8 @@ private static void printUsage(String cmd) {
System.err.println(" ["+ClearSpaceQuotaCommand.USAGE+"]");
System.err.println(" [-setBalancerBandwidth <bandwidth in bytes per second>]");
System.err.println(" [-fetchImage <local directory>]");
System.err.println(" [-shutdownDatanode <datanode_host:ipc_port> [upgrade]]");
System.err.println(" [-getDatanodeInfo <datanode_host:ipc_port>]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
@ -1216,6 +1239,16 @@ public int run(String[] argv) throws Exception {
printUsage(cmd);
return exitCode;
}
} else if ("-shutdownDatanode".equals(cmd)) {
if ((argv.length != 2) && (argv.length != 3)) {
printUsage(cmd);
return exitCode;
}
} else if ("-getDatanodeInfo".equals(cmd)) {
if (argv.length != 2) {
printUsage(cmd);
return exitCode;
}
}
// initialize DFSAdmin
@ -1279,6 +1312,10 @@ public int run(String[] argv) throws Exception {
exitCode = setBalancerBandwidth(argv, i);
} else if ("-fetchImage".equals(cmd)) {
exitCode = fetchImage(argv, i);
} else if ("-shutdownDatanode".equals(cmd)) {
exitCode = shutdownDatanode(argv, i);
} else if ("-getDatanodeInfo".equals(cmd)) {
exitCode = getDatanodeInfo(argv, i);
} else if ("-help".equals(cmd)) {
if (i < argv.length) {
printHelp(argv[i]);
@ -1363,6 +1400,33 @@ private int refreshNamenodes(String[] argv, int i) throws IOException {
return 0;
}
private int shutdownDatanode(String[] argv, int i) throws IOException {
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);
boolean upgrade = false;
if (argv.length-1 == i+1) {
if ("upgrade".equals(argv[i+1])) {
upgrade = true;
} else {
printUsage("-shutdownDatanode");
return -1;
}
}
dnProxy.shutdownDatanode(upgrade);
return 0;
}
private int getDatanodeInfo(String[] argv, int i) throws IOException {
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);
try {
DatanodeLocalInfo dnInfo = dnProxy.getDatanodeInfo();
System.out.println(dnInfo.getDatanodeLocalReport());
} catch (IOException ioe) {
System.err.println("Datanode unreachable.");
return -1;
}
return 0;
}
/**
* main() has some simple utility methods.
* @param argv Command line parameters.

View File

@ -121,6 +121,28 @@ message GetHdfsBlockLocationsResponseProto {
repeated uint32 volumeIndexes = 2;
}
/**
* forUpgrade - if true, clients are advised to wait for restart and quick
* upgrade restart is instrumented. Otherwise, datanode does
* the regular shutdown.
*/
message ShutdownDatanodeRequestProto {
required bool forUpgrade = 1;
}
message ShutdownDatanodeResponseProto {
}
/**
* Ping datanode for liveness and quick info
*/
message GetDatanodeInfoRequestProto {
}
message GetDatanodeInfoResponseProto {
required DatanodeLocalInfoProto localInfo = 1;
}
/**
* Protocol used from client to the Datanode.
* See the request and response for details of rpc call.
@ -158,4 +180,10 @@ service ClientDatanodeProtocolService {
*/
rpc getHdfsBlockLocations(GetHdfsBlockLocationsRequestProto)
returns(GetHdfsBlockLocationsResponseProto);
rpc shutdownDatanode(ShutdownDatanodeRequestProto)
returns(ShutdownDatanodeResponseProto);
rpc getDatanodeInfo(GetDatanodeInfoRequestProto)
returns(GetDatanodeInfoResponseProto);
}

View File

@ -60,6 +60,15 @@ message DatanodeIDProto {
optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
}
/**
* Datanode local information
*/
message DatanodeLocalInfoProto {
required string softwareVersion = 1;
required string configVersion = 2;
required uint64 uptime = 3;
}
/**
* DatanodeInfo array
*/

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@ -262,4 +263,35 @@ public void testSecondaryNameNode() throws Exception {
RollingUpgradeStartupOption.STARTED.name()};
SecondaryNameNode.main(args);
}
@Test
public void testDFSAdminDatanodeUpgradeControlCommands() throws Exception {
// start a cluster
final Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DFSAdmin dfsadmin = new DFSAdmin(conf);
DataNode dn = cluster.getDataNodes().get(0);
// check the datanode
final String dnAddr = dn.getDatanodeId().getIpcAddr(false);
final String[] args1 = {"-getDatanodeInfo", dnAddr};
Assert.assertEquals(0, dfsadmin.run(args1));
// issue shutdown to the datanode.
final String[] args2 = {"-shutdownDatanode", dnAddr, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args2));
// the datanode should be down.
Thread.sleep(2000);
Assert.assertFalse("DataNode should exit", dn.isDatanodeUp());
// ping should fail.
Assert.assertEquals(-1, dfsadmin.run(args1));
} finally {
if (cluster != null) cluster.shutdown();
}
}
}