diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 58c5ad39b9..4b7e59c51f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -23,7 +23,6 @@ import java.net.URI; import java.net.URL; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -67,6 +66,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncaughtExceptionHandlers; +import org.apache.hadoop.util.Time; /** * Channel to a remote JournalNode using Hadoop IPC. @@ -154,26 +154,15 @@ public class IPCLoggerChannel implements AsyncLogger { private static final long WARN_JOURNAL_MILLIS_THRESHOLD = 1000; - static final Factory FACTORY = new AsyncLogger.Factory() { - @Override - public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, - String journalId, String nameServiceId, InetSocketAddress addr) { - return new IPCLoggerChannel(conf, nsInfo, journalId, nameServiceId, addr); - } - }; + static final Factory FACTORY = IPCLoggerChannel::new; - public IPCLoggerChannel(Configuration conf, - NamespaceInfo nsInfo, - String journalId, - InetSocketAddress addr) { + public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr) { this(conf, nsInfo, journalId, null, addr); } - public IPCLoggerChannel(Configuration conf, - NamespaceInfo nsInfo, - String journalId, - String nameServiceId, - InetSocketAddress addr) { + public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo, + String journalId, String nameServiceId, InetSocketAddress addr) { this.conf = conf; this.nsInfo = nsInfo; this.journalId = journalId; @@ -202,7 +191,7 @@ public synchronized void setCommittedTxId(long txid) { "Trying to move committed txid backwards in client " + "old: %s new: %s", committedTxId, txid); this.committedTxId = txid; - this.lastCommitNanos = System.nanoTime(); + this.lastCommitNanos = Time.monotonicNowNanos(); } @Override @@ -229,25 +218,19 @@ protected QJournalProtocol createProxy() throws IOException { final Configuration confCopy = new Configuration(conf); // Need to set NODELAY or else batches larger than MTU can trigger - // 40ms nagling delays. - confCopy.setBoolean( - CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, - true); - + // 40ms nailing delays. + confCopy.setBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, true); RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine2.class); return SecurityUtil.doAsLoginUser( - new PrivilegedExceptionAction() { - @Override - public QJournalProtocol run() throws IOException { - RPC.setProtocolEngine(confCopy, - QJournalProtocolPB.class, ProtobufRpcEngine2.class); - QJournalProtocolPB pbproxy = RPC.getProxy( - QJournalProtocolPB.class, - RPC.getProtocolVersion(QJournalProtocolPB.class), - addr, confCopy); - return new QJournalProtocolTranslatorPB(pbproxy); - } + (PrivilegedExceptionAction) () -> { + RPC.setProtocolEngine(confCopy, + QJournalProtocolPB.class, ProtobufRpcEngine2.class); + QJournalProtocolPB pbproxy = RPC.getProxy( + QJournalProtocolPB.class, + RPC.getProtocolVersion(QJournalProtocolPB.class), + addr, confCopy); + return new QJournalProtocolTranslatorPB(pbproxy); }); } @@ -260,10 +243,8 @@ protected ExecutorService createSingleThreadExecutor() { return Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("Logger channel (from single-thread executor) to " + - addr) - .setUncaughtExceptionHandler( - UncaughtExceptionHandlers.systemExit()) + .setNameFormat("Logger channel (from single-thread executor) to " + addr) + .setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit()) .build()); } @@ -308,11 +289,6 @@ private synchronized RequestInfo createReqInfo() { epoch, ipcSerial++, committedTxId); } - @VisibleForTesting - synchronized long getNextIpcSerial() { - return ipcSerial; - } - public synchronized int getQueuedEditsSize() { return queuedEditsSizeBytes; } @@ -333,11 +309,7 @@ public synchronized boolean isOutOfSync() { @VisibleForTesting void waitForAllPendingCalls() throws InterruptedException { try { - singleThreadExecutor.submit(new Runnable() { - @Override - public void run() { - } - }).get(); + singleThreadExecutor.submit(() -> {}).get(); } catch (ExecutionException e) { // This can't happen! throw new AssertionError(e); @@ -346,36 +318,23 @@ public void run() { @Override public ListenableFuture isFormatted() { - return singleThreadExecutor.submit(new Callable() { - @Override - public Boolean call() throws IOException { - return getProxy().isFormatted(journalId, nameServiceId); - } - }); + return singleThreadExecutor.submit(() -> getProxy().isFormatted(journalId, nameServiceId)); } @Override public ListenableFuture getJournalState() { - return singleThreadExecutor.submit(new Callable() { - @Override - public GetJournalStateResponseProto call() throws IOException { - GetJournalStateResponseProto ret = - getProxy().getJournalState(journalId, nameServiceId); - constructHttpServerURI(ret); - return ret; - } + return singleThreadExecutor.submit(() -> { + GetJournalStateResponseProto ret = getProxy().getJournalState(journalId, nameServiceId); + constructHttpServerURI(ret); + return ret; }); } @Override public ListenableFuture newEpoch( final long epoch) { - return singleThreadExecutor.submit(new Callable() { - @Override - public NewEpochResponseProto call() throws IOException { - return getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch); - } - }); + return singleThreadExecutor.submit( + () -> getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch)); } @Override @@ -390,50 +349,43 @@ public ListenableFuture sendEdits( // When this batch is acked, we use its submission time in order // to calculate how far we are lagging. - final long submitNanos = System.nanoTime(); + final long submitNanos = Time.monotonicNowNanos(); ListenableFuture ret = null; try { - ret = singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - throwIfOutOfSync(); + ret = singleThreadExecutor.submit(() -> { + throwIfOutOfSync(); - long rpcSendTimeNanos = System.nanoTime(); - try { - getProxy().journal(createReqInfo(), - segmentTxId, firstTxnId, numTxns, data); - } catch (IOException e) { - QuorumJournalManager.LOG.warn( - "Remote journal " + IPCLoggerChannel.this + " failed to " + - "write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) + - ". Will try to write to this JN again after the next " + - "log roll.", e); - synchronized (IPCLoggerChannel.this) { - outOfSync = true; - } - throw e; - } finally { - long now = System.nanoTime(); - long rpcTime = TimeUnit.MICROSECONDS.convert( - now - rpcSendTimeNanos, TimeUnit.NANOSECONDS); - long endToEndTime = TimeUnit.MICROSECONDS.convert( - now - submitNanos, TimeUnit.NANOSECONDS); - metrics.addWriteEndToEndLatency(endToEndTime); - metrics.addWriteRpcLatency(rpcTime); - if (rpcTime / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) { - QuorumJournalManager.LOG.warn( - "Took " + (rpcTime / 1000) + "ms to send a batch of " + - numTxns + " edits (" + data.length + " bytes) to " + - "remote journal " + IPCLoggerChannel.this); - } - } + final long rpcSendTimeNanos = Time.monotonicNowNanos(); + try { + getProxy().journal(createReqInfo(), segmentTxId, firstTxnId, numTxns, data); + } catch (IOException e) { + QuorumJournalManager.LOG.warn("Remote journal {} failed to write txns {}-{}." + + " Will try to write to this JN again after the next log roll.", + IPCLoggerChannel.this, firstTxnId, (firstTxnId + numTxns - 1), e); synchronized (IPCLoggerChannel.this) { - highestAckedTxId = firstTxnId + numTxns - 1; - lastAckNanos = submitNanos; + outOfSync = true; + } + throw e; + } finally { + final long nowNanos = Time.monotonicNowNanos(); + final long rpcTimeMicros = TimeUnit.MICROSECONDS.convert( + (nowNanos - rpcSendTimeNanos), TimeUnit.NANOSECONDS); + final long endToEndTimeMicros = TimeUnit.MICROSECONDS.convert( + (nowNanos - submitNanos), TimeUnit.NANOSECONDS); + metrics.addWriteEndToEndLatency(endToEndTimeMicros); + metrics.addWriteRpcLatency(rpcTimeMicros); + if (rpcTimeMicros / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) { + QuorumJournalManager.LOG.warn( + "Took {}ms to send a batch of {} edits ({} bytes) to remote journal {}.", + rpcTimeMicros / 1000, numTxns, data.length, IPCLoggerChannel.this); } - return null; } + synchronized (IPCLoggerChannel.this) { + highestAckedTxId = firstTxnId + numTxns - 1; + lastAckNanos = submitNanos; + } + return null; }); } finally { if (ret == null) { @@ -460,14 +412,12 @@ public void onSuccess(Void t) { return ret; } - private void throwIfOutOfSync() - throws JournalOutOfSyncException, IOException { + private void throwIfOutOfSync() throws IOException { if (isOutOfSync()) { // Even if we're out of sync, it's useful to send an RPC // to the remote node in order to update its lag metrics, etc. heartbeatIfNecessary(); - throw new JournalOutOfSyncException( - "Journal disabled until next roll"); + throw new JournalOutOfSyncException("Journal disabled until next roll"); } } @@ -497,12 +447,10 @@ private void heartbeatIfNecessary() throws IOException { private synchronized void reserveQueueSpace(int size) throws LoggerTooFarBehindException { Preconditions.checkArgument(size >= 0); - if (queuedEditsSizeBytes + size > queueSizeLimitBytes && - queuedEditsSizeBytes > 0) { - QuorumJournalManager.LOG.warn("Pending edits to " + IPCLoggerChannel.this - + " is going to exceed limit size: " + queueSizeLimitBytes - + ", current queued edits size: " + queuedEditsSizeBytes - + ", will silently drop " + size + " bytes of edits!"); + if (queuedEditsSizeBytes + size > queueSizeLimitBytes && queuedEditsSizeBytes > 0) { + QuorumJournalManager.LOG.warn("Pending edits to {} is going to exceed limit size: {}" + + ", current queued edits size: {}, will silently drop {} bytes of edits!", + IPCLoggerChannel.class, queueSizeLimitBytes, queuedEditsSizeBytes, size); throw new LoggerTooFarBehindException(); } queuedEditsSizeBytes += size; @@ -514,203 +462,144 @@ private synchronized void unreserveQueueSpace(int size) { } @Override - public ListenableFuture format(final NamespaceInfo nsInfo, - final boolean force) { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws Exception { - getProxy().format(journalId, nameServiceId, nsInfo, force); - return null; - } + public ListenableFuture format(final NamespaceInfo nsInfo, final boolean force) { + return singleThreadExecutor.submit(() -> { + getProxy().format(journalId, nameServiceId, nsInfo, force); + return null; }); } @Override - public ListenableFuture startLogSegment(final long txid, - final int layoutVersion) { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - getProxy().startLogSegment(createReqInfo(), txid, layoutVersion); - synchronized (IPCLoggerChannel.this) { - if (outOfSync) { - outOfSync = false; - QuorumJournalManager.LOG.info( - "Restarting previously-stopped writes to " + - IPCLoggerChannel.this + " in segment starting at txid " + - txid); - } + public ListenableFuture startLogSegment(final long txid, final int layoutVersion) { + return singleThreadExecutor.submit(() -> { + getProxy().startLogSegment(createReqInfo(), txid, layoutVersion); + synchronized (IPCLoggerChannel.this) { + if (outOfSync) { + outOfSync = false; + QuorumJournalManager.LOG.info( + "Restarting previously-stopped writes to {} in segment starting at txid {}.", + IPCLoggerChannel.class, txid); } - return null; } + return null; }); } @Override - public ListenableFuture finalizeLogSegment( - final long startTxId, final long endTxId) { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - throwIfOutOfSync(); - - getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId); - return null; - } + public ListenableFuture finalizeLogSegment(final long startTxId, final long endTxId) { + return singleThreadExecutor.submit(() -> { + throwIfOutOfSync(); + getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId); + return null; }); } @Override public ListenableFuture purgeLogsOlderThan(final long minTxIdToKeep) { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws Exception { - getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep); - return null; - } + return singleThreadExecutor.submit(() -> { + getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep); + return null; }); } @Override public ListenableFuture getJournaledEdits( long fromTxnId, int maxTransactions) { - return parallelExecutor.submit( - new Callable() { - @Override - public GetJournaledEditsResponseProto call() throws IOException { - return getProxy().getJournaledEdits(journalId, nameServiceId, - fromTxnId, maxTransactions); - } - }); + return parallelExecutor.submit(() -> getProxy().getJournaledEdits( + journalId, nameServiceId, fromTxnId, maxTransactions)); } @Override public ListenableFuture getEditLogManifest( final long fromTxnId, final boolean inProgressOk) { - return parallelExecutor.submit(new Callable() { - @Override - public RemoteEditLogManifest call() throws IOException { - GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( - journalId, nameServiceId, fromTxnId, inProgressOk); - // Update the http port, since we need this to build URLs to any of the - // returned logs. + return parallelExecutor.submit(() -> { + GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( + journalId, nameServiceId, fromTxnId, inProgressOk); + // Update the http port, since we need this to build URLs to any of the + // returned logs. + constructHttpServerURI(ret); + return PBHelper.convert(ret.getManifest()); + }); + } + + @Override + public ListenableFuture prepareRecovery(final long segmentTxId) { + return singleThreadExecutor.submit(() -> { + if (!hasHttpServerEndPoint()) { + // force an RPC call, so we know what the HTTP port should be if it + // hasn't done so. + GetJournalStateResponseProto ret = getProxy().getJournalState( + journalId, nameServiceId); constructHttpServerURI(ret); - return PBHelper.convert(ret.getManifest()); } + return getProxy().prepareRecovery(createReqInfo(), segmentTxId); }); } @Override - public ListenableFuture prepareRecovery( - final long segmentTxId) { - return singleThreadExecutor.submit(new Callable() { - @Override - public PrepareRecoveryResponseProto call() throws IOException { - if (!hasHttpServerEndPoint()) { - // force an RPC call so we know what the HTTP port should be if it - // haven't done so. - GetJournalStateResponseProto ret = getProxy().getJournalState( - journalId, nameServiceId); - constructHttpServerURI(ret); - } - return getProxy().prepareRecovery(createReqInfo(), segmentTxId); - } - }); - } - - @Override - public ListenableFuture acceptRecovery( - final SegmentStateProto log, final URL url) { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - getProxy().acceptRecovery(createReqInfo(), log, url); - return null; - } + public ListenableFuture acceptRecovery(final SegmentStateProto log, final URL url) { + return singleThreadExecutor.submit(() -> { + getProxy().acceptRecovery(createReqInfo(), log, url); + return null; }); } @Override public ListenableFuture doPreUpgrade() { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - getProxy().doPreUpgrade(journalId); - return null; - } + return singleThreadExecutor.submit(() -> { + getProxy().doPreUpgrade(journalId); + return null; }); } @Override public ListenableFuture doUpgrade(final StorageInfo sInfo) { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - getProxy().doUpgrade(journalId, sInfo); - return null; - } + return singleThreadExecutor.submit(() -> { + getProxy().doUpgrade(journalId, sInfo); + return null; }); } @Override public ListenableFuture doFinalize() { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - getProxy().doFinalize(journalId, nameServiceId); - return null; - } + return singleThreadExecutor.submit(() -> { + getProxy().doFinalize(journalId, nameServiceId); + return null; }); } @Override public ListenableFuture canRollBack(final StorageInfo storage, final StorageInfo prevStorage, final int targetLayoutVersion) { - return singleThreadExecutor.submit(new Callable() { - @Override - public Boolean call() throws IOException { - return getProxy().canRollBack(journalId, nameServiceId, - storage, prevStorage, targetLayoutVersion); - } - }); + return singleThreadExecutor.submit( + () -> getProxy().canRollBack(journalId, nameServiceId, + storage, prevStorage, targetLayoutVersion)); } @Override public ListenableFuture doRollback() { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - getProxy().doRollback(journalId, nameServiceId); - return null; - } + return singleThreadExecutor.submit(() -> { + getProxy().doRollback(journalId, nameServiceId); + return null; }); } @Override public ListenableFuture discardSegments(final long startTxId) { - return singleThreadExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - getProxy().discardSegments(journalId, nameServiceId, startTxId); - return null; - } + return singleThreadExecutor.submit(() -> { + getProxy().discardSegments(journalId, nameServiceId, startTxId); + return null; }); } @Override public ListenableFuture getJournalCTime() { - return singleThreadExecutor.submit(new Callable() { - @Override - public Long call() throws IOException { - return getProxy().getJournalCTime(journalId, nameServiceId); - } - }); + return singleThreadExecutor.submit(() -> getProxy().getJournalCTime(journalId, nameServiceId)); } @Override public String toString() { - return InetAddresses.toAddrString(addr.getAddress()) + ':' + - addr.getPort(); + return InetAddresses.toAddrString(addr.getAddress()) + ':' + addr.getPort(); } @Override @@ -778,5 +667,4 @@ private URL getHttpServerURI(String scheme, int port) { private boolean hasHttpServerEndPoint() { return httpServerURL != null; } - }