From e2a0dca43b57993fe8bd1be05281e126a325cf83 Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu <31558262+aswinmprabhu@users.noreply.github.com> Date: Tue, 23 Jul 2024 18:25:57 +0530 Subject: [PATCH] HDFS-16690. Automatically format unformatted JNs with JournalNodeSyncer (#6925). Contributed by Aswin M Prabhu. Signed-off-by: He Xiaoqiao --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../protocol/InterQJournalProtocol.java | 10 +++ ...JournalProtocolServerSideTranslatorPB.java | 16 ++++ .../InterQJournalProtocolTranslatorPB.java | 14 ++++ .../qjournal/server/JournalNodeRpcServer.java | 28 ++++--- .../qjournal/server/JournalNodeSyncer.java | 81 ++++++++++++++++++- .../main/proto/InterQJournalProtocol.proto | 7 ++ .../src/main/resources/hdfs-default.xml | 10 +++ .../qjournal/server/TestJournalNodeSync.java | 44 ++++++++++ 9 files changed, 202 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b9f8e07f67..dd3193fdad 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1471,6 +1471,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY = "dfs.journalnode.sync.interval"; public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L; + public static final String DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY = + "dfs.journalnode.enable.sync.format"; + public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT = false; public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY = "dfs.journalnode.edit-cache-size.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java index f1f7e9ce1f..c3eed14c3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.security.KerberosInfo; @@ -51,4 +52,13 @@ GetEditLogManifestResponseProto getEditLogManifestFromJournal( String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) throws IOException; + /** + * Get the storage info for the specified journal. + * @param jid the journal identifier + * @param nameServiceId the name service id + * @return the storage info object + */ + StorageInfoProto getStorageInfo(String jid, String nameServiceId) + throws IOException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java index ba5ddb1ab6..ac67bcb0cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; @@ -60,4 +62,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal( throw new ServiceException(e); } } + + @Override + public StorageInfoProto getStorageInfo( + RpcController controller, GetStorageInfoRequestProto request) + throws ServiceException { + try { + return impl.getStorageInfo( + request.getJid().getIdentifier(), + request.hasNameServiceId() ? request.getNameServiceId() : null + ); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java index 4544308fff..49ae53fcee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.qjournal.protocolPB; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -75,6 +77,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal( req.build())); } + @Override + public StorageInfoProto getStorageInfo(String jid, String nameServiceId) + throws IOException { + InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req = + InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder() + .setJid(convertJournalId(jid)); + if (nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build())); + } + private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) { return QJournalProtocolProtos.JournalIdProto.newBuilder() .setIdentifier(jid) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 7e33ab5c75..b09d09aed0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.qjournal.server; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; import org.apache.hadoop.thirdparty.protobuf.BlockingService; import org.slf4j.Logger; import org.apache.hadoop.classification.InterfaceAudience; @@ -71,14 +72,14 @@ public class JournalNodeRpcServer implements QJournalProtocol, JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException { this.jn = jn; - + Configuration confCopy = new Configuration(conf); - + // Ensure that nagling doesn't kick in, which could cause latency issues. confCopy.setBoolean( CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, true); - + InetSocketAddress addr = getAddress(confCopy); String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null); if (bindHost == null) { @@ -104,7 +105,7 @@ public class JournalNodeRpcServer implements QJournalProtocol, this.handlerCount = confHandlerCount; LOG.info("The number of JournalNodeRpcServer handlers is {}.", this.handlerCount); - + this.server = new RPC.Builder(confCopy) .setProtocol(QJournalProtocolPB.class) .setInstance(service) @@ -149,15 +150,15 @@ void start() { public InetSocketAddress getAddress() { return server.getListenerAddress(); } - + void join() throws InterruptedException { this.server.join(); } - + void stop() { this.server.stop(); } - + static InetSocketAddress getAddress(Configuration conf) { String addr = conf.get( DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, @@ -211,7 +212,7 @@ public void journal(RequestInfo reqInfo, jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); } - + @Override public void heartbeat(RequestInfo reqInfo) throws IOException { jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) @@ -245,10 +246,10 @@ public GetEditLogManifestResponseProto getEditLogManifest( String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) throws IOException { - + RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId) .getEditLogManifest(sinceTxId, inProgressOk); - + return GetEditLogManifestResponseProto.newBuilder() .setManifest(PBHelper.convert(manifest)) .setHttpPort(jn.getBoundHttpAddress().getPort()) @@ -256,6 +257,13 @@ public GetEditLogManifestResponseProto getEditLogManifest( .build(); } + @Override + public StorageInfoProto getStorageInfo(String jid, + String nameServiceId) throws IOException { + StorageInfo storage = jn.getOrCreateJournal(jid, nameServiceId).getStorage(); + return PBHelper.convert(storage); + } + @Override public GetJournaledEditsResponseProto getJournaledEdits(String jid, String nameServiceId, long sinceTxId, int maxTxns) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index f451b46de7..75010596b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.qjournal.server; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -79,6 +82,7 @@ public class JournalNodeSyncer { private int numOtherJNs; private int journalNodeIndexForSync = 0; private final long journalSyncInterval; + private final boolean tryFormatting; private final int logSegmentTransferTimeout; private final DataTransferThrottler throttler; private final JournalMetrics metrics; @@ -98,6 +102,9 @@ public class JournalNodeSyncer { logSegmentTransferTimeout = conf.getInt( DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY, DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); + tryFormatting = conf.getBoolean( + DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY, + DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT); throttler = getThrottler(conf); metrics = journal.getMetrics(); journalSyncerStarted = false; @@ -171,6 +178,8 @@ private void startSyncJournalsDaemon() { // Wait for journal to be formatted to create edits.sync directory while(!journal.isFormatted()) { try { + // Format the journal with namespace info from the other JNs if it is not formatted + formatWithSyncer(); Thread.sleep(journalSyncInterval); } catch (InterruptedException e) { LOG.error("JournalNodeSyncer daemon received Runtime exception.", e); @@ -187,7 +196,15 @@ private void startSyncJournalsDaemon() { while(shouldSync) { try { if (!journal.isFormatted()) { - LOG.warn("Journal cannot sync. Not formatted."); + LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer"); + formatWithSyncer(); + if (journal.isFormatted() && !createEditsSyncDir()) { + LOG.error("Failed to create directory for downloading log " + + "segments: {}. Stopping Journal Node Sync.", + journal.getStorage().getEditsSyncDir()); + return; + } + continue; } else { syncJournals(); } @@ -233,6 +250,68 @@ private void syncJournals() { journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs; } + private void formatWithSyncer() { + if (!tryFormatting) { + return; + } + LOG.info("Trying to format the journal with the syncer"); + try { + StorageInfo storage = null; + for (JournalNodeProxy jnProxy : otherJNProxies) { + if (!hasEditLogs(jnProxy)) { + // This avoids a race condition between `hdfs namenode -format` and + // JN syncer by checking if the other JN is not newly formatted. + continue; + } + try { + HdfsServerProtos.StorageInfoProto storageInfoResponse = + jnProxy.jnProxy.getStorageInfo(jid, nameServiceId); + storage = PBHelper.convert( + storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE + ); + if (storage.getNamespaceID() == 0) { + LOG.error("Got invalid StorageInfo from " + jnProxy); + storage = null; + continue; + } + LOG.info("Got StorageInfo " + storage + " from " + jnProxy); + break; + } catch (IOException e) { + LOG.error("Could not get StorageInfo from " + jnProxy, e); + } + } + if (storage == null) { + LOG.error("Could not get StorageInfo from any JournalNode. " + + "JournalNodeSyncer cannot format the journal."); + return; + } + NamespaceInfo nsInfo = new NamespaceInfo(storage); + journal.format(nsInfo, true); + } catch (IOException e) { + LOG.error("Exception in formatting the journal with the syncer", e); + } + } + + private boolean hasEditLogs(JournalNodeProxy journalProxy) { + GetEditLogManifestResponseProto editLogManifest; + try { + editLogManifest = journalProxy.jnProxy.getEditLogManifestFromJournal( + jid, nameServiceId, 0, false); + } catch (IOException e) { + LOG.error("Could not get edit log manifest from " + journalProxy, e); + return false; + } + + List otherJournalEditLogs = PBHelper.convert( + editLogManifest.getManifest()).getLogs(); + if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) { + LOG.warn("Journal at " + journalProxy.jnAddr + " has no edit logs"); + return false; + } + + return true; + } + private void syncWithJournalAtIndex(int index) { LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":" + jn.getBoundIpcAddress().getPort() + " with " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto index 1c78423b40..5510eeb7c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto @@ -31,8 +31,15 @@ package hadoop.hdfs.qjournal; import "HdfsServer.proto"; import "QJournalProtocol.proto"; +message GetStorageInfoRequestProto { + required JournalIdProto jid = 1; + optional string nameServiceId = 2; +} service InterQJournalProtocolService { rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto) returns (GetEditLogManifestResponseProto); + + rpc getStorageInfo(GetStorageInfoRequestProto) + returns (StorageInfoProto); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d6fefa4e93..1295c0dca8 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5071,6 +5071,16 @@ + + dfs.journalnode.enable.sync.format + false + + If true, the journal node syncer daemon that tries to sync edit + logs between journal nodes will try to format its journal if it is not. + It will query the other journal nodes for the storage info required to format. + + + dfs.journalnode.edit-cache-size.bytes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 28e36e03bf..ac250ffc4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.function.Supplier; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -75,6 +76,7 @@ public void setUpMiniCluster() throws IOException { conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true); conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L); + conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY, true); if (testName.getMethodName().equals( "testSyncAfterJNdowntimeWithoutQJournalQueue")) { conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0); @@ -478,6 +480,33 @@ public void testSyncDuringRollingUpgrade() throws Exception { } } + @Test(timeout=300_000) + public void testFormatWithSyncer() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + // Generate some edit logs + long firstTxId = generateEditLog(); + + // Delete them from the JN01 + List missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + + // Wait to ensure sync starts, delete the storage directory itself to simulate a disk wipe + // and ensure that the in-memory formatting state of JNStorage gets updated + Thread.sleep(2000); + FileUtils.deleteDirectory(firstJournalDir); + jCluster.getJournalNode(0).getOrCreateJournal(jid).getStorage().analyzeStorage(); + + // Wait for JN formatting with Syncer + GenericTestUtils.waitFor(jnFormatted(0), 500, 30000); + // Generate some more edit log so that the JN updates its committed tx id + generateEditLog(); + // Check that the missing edit logs have been synced + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + private File deleteEditLog(File currentDir, long startTxId) throws IOException { EditLogFile logFile = getLogFile(currentDir, startTxId); @@ -581,4 +610,19 @@ public Boolean get() { }; return supplier; } + + private Supplier jnFormatted(int jnIndex) throws Exception { + Supplier supplier = new Supplier() { + @Override + public Boolean get() { + try { + return jCluster.getJournalNode(jnIndex).getOrCreateJournal(jid) + .isFormatted(); + } catch (Exception e) { + return false; + } + } + }; + return supplier; + } }