HDFS-17396. BootstrapStandby should download rollback image during RollingUpgrade (#6583)
This commit is contained in:
parent
a2d7241190
commit
5584efd8d4
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
@ -114,6 +115,17 @@ public long getMostRecentCheckpointTxId() throws IOException {
|
||||
return rpcServer.invokeAtAvailableNs(method, long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
|
||||
throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.READ);
|
||||
|
||||
RemoteMethod method =
|
||||
new RemoteMethod(NamenodeProtocol.class, "getMostRecentNameNodeFileTxId",
|
||||
new Class<?>[] {NNStorage.NameNodeFile.class}, nnf);
|
||||
return rpcServer.invokeAtAvailableNs(method, long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckpointSignature rollEditLog() throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.WRITE, false);
|
||||
|
@ -146,6 +146,7 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
@ -1641,6 +1642,12 @@ public long getMostRecentCheckpointTxId() throws IOException {
|
||||
return nnProto.getMostRecentCheckpointTxId();
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
|
||||
throws IOException {
|
||||
return nnProto.getMostRecentNameNodeFileTxId(nnf);
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
public CheckpointSignature rollEditLog() throws IOException {
|
||||
return nnProto.rollEditLog();
|
||||
|
@ -35,6 +35,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
|
||||
@ -51,6 +53,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
@ -141,6 +144,20 @@ public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
|
||||
return GetMostRecentCheckpointTxIdResponseProto.newBuilder().setTxId(txid).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetMostRecentNameNodeFileTxIdResponseProto getMostRecentNameNodeFileTxId(
|
||||
RpcController unused, GetMostRecentNameNodeFileTxIdRequestProto request)
|
||||
throws ServiceException {
|
||||
long txid;
|
||||
try {
|
||||
txid = impl.getMostRecentNameNodeFileTxId(
|
||||
NNStorage.NameNodeFile.valueOf(request.getNameNodeFile()));
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return GetMostRecentNameNodeFileTxIdResponseProto.newBuilder().setTxId(txid).build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RollEditLogResponseProto rollEditLog(RpcController unused,
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
|
||||
@ -46,6 +47,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
@ -134,6 +136,14 @@ public long getMostRecentCheckpointTxId() throws IOException {
|
||||
GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException {
|
||||
return ipc(() -> rpcProxy.getMostRecentNameNodeFileTxId(NULL_CONTROLLER,
|
||||
GetMostRecentNameNodeFileTxIdRequestProto.newBuilder()
|
||||
.setNameNodeFile(nnf.toString()).build()).getTxId());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckpointSignature rollEditLog() throws IOException {
|
||||
return PBHelper.convert(ipc(() -> rpcProxy.rollEditLog(NULL_CONTROLLER,
|
||||
|
@ -1562,4 +1562,31 @@ public void updateLastAppliedTxIdFromWritten() {
|
||||
public long getMostRecentCheckpointTxId() {
|
||||
return storage.getMostRecentCheckpointTxId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a NameNodeFile type, retrieve the latest txid for that file or {@link
|
||||
* HdfsServerConstants#INVALID_TXID} if the file does not exist.
|
||||
*
|
||||
* @param nnf The NameNodeFile type to retrieve the latest txid from.
|
||||
* @return the latest txid for the NameNodeFile type, or {@link
|
||||
* HdfsServerConstants#INVALID_TXID} if there is no FSImage file of the type
|
||||
* requested.
|
||||
* @throws IOException
|
||||
*/
|
||||
public long getMostRecentNameNodeFileTxId(NameNodeFile nnf)
|
||||
throws IOException {
|
||||
final FSImageStorageInspector inspector =
|
||||
new FSImageTransactionalStorageInspector(EnumSet.of(nnf));
|
||||
storage.inspectStorageDirs(inspector);
|
||||
try {
|
||||
List<FSImageFile> images = inspector.getLatestImages();
|
||||
if (images != null && !images.isEmpty()) {
|
||||
return images.get(0).getCheckpointTxId();
|
||||
} else {
|
||||
return HdfsServerConstants.INVALID_TXID;
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
return HdfsServerConstants.INVALID_TXID;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1361,6 +1361,14 @@ public long getMostRecentCheckpointTxId() throws IOException {
|
||||
namesystem.checkSuperuserPrivilege(operationName);
|
||||
return namesystem.getFSImage().getMostRecentCheckpointTxId();
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException {
|
||||
checkNNStartup();
|
||||
namesystem.checkOperation(OperationCategory.UNCHECKED);
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
return namesystem.getFSImage().getMostRecentNameNodeFileTxId(nnf);
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
public CheckpointSignature rollEditLog() throws IOException {
|
||||
|
@ -248,7 +248,7 @@ private int doRun() throws IOException {
|
||||
}
|
||||
|
||||
// download the fsimage from active namenode
|
||||
int download = downloadImage(storage, proxy, proxyInfo);
|
||||
int download = downloadImage(storage, proxy, proxyInfo, isRollingUpgrade);
|
||||
if (download != 0) {
|
||||
return download;
|
||||
}
|
||||
@ -351,12 +351,32 @@ private void doUpgrade(NNStorage storage) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo)
|
||||
private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo,
|
||||
boolean isRollingUpgrade)
|
||||
throws IOException {
|
||||
// Load the newly formatted image, using all of the directories
|
||||
// (including shared edits)
|
||||
final long imageTxId = proxy.getMostRecentCheckpointTxId();
|
||||
final long curTxId = proxy.getTransactionID();
|
||||
|
||||
if (isRollingUpgrade) {
|
||||
final long rollbackTxId =
|
||||
proxy.getMostRecentNameNodeFileTxId(NameNodeFile.IMAGE_ROLLBACK);
|
||||
assert rollbackTxId != HdfsServerConstants.INVALID_TXID :
|
||||
"Expected a valid TXID for fsimage_rollback file";
|
||||
FSImage rollbackImage = new FSImage(conf);
|
||||
try {
|
||||
rollbackImage.getStorage().setStorageInfo(storage);
|
||||
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
||||
proxyInfo.getHttpAddress(), rollbackTxId, storage, true, true);
|
||||
rollbackImage.saveDigestAndRenameCheckpointImage(
|
||||
NameNodeFile.IMAGE_ROLLBACK, rollbackTxId, hash);
|
||||
} catch (IOException ioe) {
|
||||
throw ioe;
|
||||
} finally {
|
||||
rollbackImage.close();
|
||||
}
|
||||
}
|
||||
FSImage image = new FSImage(conf);
|
||||
try {
|
||||
image.getStorage().setStorageInfo(storage);
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
|
||||
import org.apache.hadoop.io.retry.AtMostOnce;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
@ -111,6 +112,12 @@ BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
||||
@Idempotent
|
||||
public long getMostRecentCheckpointTxId() throws IOException;
|
||||
|
||||
/**
|
||||
* Get the transaction ID of the most recent checkpoint for the given NameNodeFile.
|
||||
*/
|
||||
@Idempotent
|
||||
long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException;
|
||||
|
||||
/**
|
||||
* Closes the current edit log and opens a new one. The
|
||||
* call fails if the file system is in SafeMode.
|
||||
|
@ -108,6 +108,14 @@ message GetMostRecentCheckpointTxIdResponseProto{
|
||||
required uint64 txId = 1;
|
||||
}
|
||||
|
||||
message GetMostRecentNameNodeFileTxIdRequestProto {
|
||||
required string nameNodeFile = 1;
|
||||
}
|
||||
|
||||
message GetMostRecentNameNodeFileTxIdResponseProto{
|
||||
required uint64 txId = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* registration - Namenode reporting the error
|
||||
* errorCode - error code indicating the error
|
||||
@ -253,6 +261,12 @@ service NamenodeProtocolService {
|
||||
rpc getMostRecentCheckpointTxId(GetMostRecentCheckpointTxIdRequestProto)
|
||||
returns(GetMostRecentCheckpointTxIdResponseProto);
|
||||
|
||||
/**
|
||||
* Get the transaction ID of the NameNodeFile
|
||||
*/
|
||||
rpc getMostRecentNameNodeFileTxId(GetMostRecentNameNodeFileTxIdRequestProto)
|
||||
returns(GetMostRecentNameNodeFileTxIdResponseProto);
|
||||
|
||||
/**
|
||||
* Close the current editlog and open a new one for checkpointing purposes
|
||||
*/
|
||||
|
@ -519,6 +519,23 @@ public static void assertNNHasCheckpoints(MiniDFSCluster cluster,
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertNNHasRollbackCheckpoints(MiniDFSCluster cluster,
|
||||
int nnIdx, List<Integer> txids) {
|
||||
|
||||
for (File nameDir : getNameNodeCurrentDirs(cluster, nnIdx)) {
|
||||
LOG.info("examining name dir with files: {}",
|
||||
Joiner.on(",").join(nameDir.listFiles()));
|
||||
// Should have fsimage_N for the three checkpoints
|
||||
LOG.info("Examining storage dir {} with contents: {}", nameDir,
|
||||
StringUtils.join(nameDir.listFiles(), ", "));
|
||||
for (long checkpointTxId : txids) {
|
||||
File image = new File(nameDir,
|
||||
NNStorage.getRollbackImageFileName(checkpointTxId));
|
||||
assertTrue("Expected non-empty " + image, image.length() > 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster, int nnIdx) {
|
||||
List<File> nameDirs = Lists.newArrayList();
|
||||
for (URI u : cluster.getNameDirs(nnIdx)) {
|
||||
|
@ -40,6 +40,7 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -189,7 +190,8 @@ public void testDownloadingLaterCheckpoint() throws Exception {
|
||||
*/
|
||||
@Test
|
||||
public void testRollingUpgradeBootstrapStandby() throws Exception {
|
||||
removeStandbyNameDirs();
|
||||
// This node is needed to create the rollback fsimage
|
||||
cluster.restartNameNode(1);
|
||||
|
||||
int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
|
||||
|
||||
@ -208,12 +210,18 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {
|
||||
|
||||
// BootstrapStandby should fail if the node has a future version
|
||||
// and the cluster isn't in rolling upgrade
|
||||
bs.setConf(cluster.getConfiguration(1));
|
||||
bs.setConf(cluster.getConfiguration(2));
|
||||
assertEquals("BootstrapStandby should return ERR_CODE_INVALID_VERSION",
|
||||
ERR_CODE_INVALID_VERSION, bs.run(new String[]{"-force"}));
|
||||
|
||||
// Start rolling upgrade
|
||||
fs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
LambdaTestUtils.await(60000, 1000, () ->
|
||||
fs.rollingUpgrade(RollingUpgradeAction.QUERY).createdRollbackImages());
|
||||
// After the rollback image is created the standby is not needed
|
||||
cluster.shutdownNameNode(1);
|
||||
removeStandbyNameDirs();
|
||||
|
||||
nn0 = spy(nn0);
|
||||
|
||||
// Make nn0 think it is a future version
|
||||
@ -237,6 +245,9 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {
|
||||
|
||||
long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
|
||||
.getFSImage().getMostRecentCheckpointTxId();
|
||||
long expectedRollbackTxId = NameNodeAdapter.getNamesystem(nn0)
|
||||
.getFSImage().getMostRecentNameNodeFileTxId(
|
||||
NNStorage.NameNodeFile.IMAGE_ROLLBACK);
|
||||
assertEquals(11, expectedCheckpointTxId);
|
||||
|
||||
for (int i = 1; i < maxNNCount; i++) {
|
||||
@ -245,6 +256,8 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {
|
||||
bs.run(new String[]{"-force"});
|
||||
FSImageTestUtil.assertNNHasCheckpoints(cluster, i,
|
||||
ImmutableList.of((int) expectedCheckpointTxId));
|
||||
FSImageTestUtil.assertNNHasRollbackCheckpoints(cluster, i,
|
||||
ImmutableList.of((int) expectedRollbackTxId));
|
||||
}
|
||||
|
||||
// Make sure the bootstrap was successful
|
||||
@ -253,6 +266,14 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {
|
||||
// We should now be able to start the standby successfully
|
||||
restartNameNodesFromIndex(1, "-rollingUpgrade", "started");
|
||||
|
||||
for (int i = 1; i < maxNNCount; i++) {
|
||||
NameNode nn = cluster.getNameNode(i);
|
||||
assertTrue("NameNodes should all have the rollback FSImage",
|
||||
nn.getFSImage().hasRollbackFSImage());
|
||||
assertTrue("NameNodes should all be inRollingUpgrade",
|
||||
nn.getNamesystem().isRollingUpgrade());
|
||||
}
|
||||
|
||||
// Cleanup standby dirs
|
||||
for (int i = 1; i < maxNNCount; i++) {
|
||||
cluster.shutdownNameNode(i);
|
||||
|
Loading…
Reference in New Issue
Block a user