From d2e85b0b6d82fef486b08de8a1d04cca1505646f Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Thu, 13 Sep 2018 16:22:37 -0700 Subject: [PATCH] HDFS-13880. Add mechanism to allow certain RPC calls to bypass sync. Contributed by Chen Liang. --- .../apache/hadoop/ipc/AlignmentContext.java | 16 +++++ .../java/org/apache/hadoop/ipc/Server.java | 42 ++++++++++-- .../apache/hadoop/hdfs/ClientGSIContext.java | 6 ++ .../hadoop/hdfs/protocol/ClientProtocol.java | 68 +++++++++---------- .../hdfs/server/namenode/ha/ReadOnly.java | 7 ++ .../server/namenode/GlobalStateIdContext.java | 21 ++++++ .../server/namenode/ha/TestObserverNode.java | 52 ++++++++++++++ 7 files changed, 173 insertions(+), 39 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index 0e8b960ecd..a435ff6c4e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -38,6 +38,7 @@ public interface AlignmentContext { /** * This is the intended server method call to implement to pass state info * during RPC response header construction. + * * @param header The RPC response header builder. */ void updateResponseState(RpcResponseHeaderProto.Builder header); @@ -45,6 +46,7 @@ public interface AlignmentContext { /** * This is the intended client method call to implement to recieve state info * during RPC response processing. + * * @param header The RPC response header. */ void receiveResponseState(RpcResponseHeaderProto header); @@ -52,6 +54,7 @@ public interface AlignmentContext { /** * This is the intended client method call to pull last seen state info * into RPC request processing. + * * @param header The RPC request header builder. */ void updateRequestState(RpcRequestHeaderProto.Builder header); @@ -59,6 +62,7 @@ public interface AlignmentContext { /** * This is the intended server method call to implement to receive * client state info during RPC response header processing. + * * @param header The RPC request header. * @return state id of in the request header. */ @@ -66,7 +70,19 @@ public interface AlignmentContext { /** * Returns the last seen state id of the alignment context instance. + * * @return the value of the last seen state id. */ long getLastSeenStateId(); + + /** + * Return true if this method call does need to be synced, false + * otherwise. sync meaning server state needs to have caught up with + * client state. + * + * @param protocolName the name of the protocol + * @param method the method call to check + * @return true if this method is async, false otherwise. + */ + boolean isCoordinatedCall(String protocolName, String method); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 6892352d71..3068264377 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -727,6 +727,7 @@ public static class Call implements Schedulable, private int priorityLevel; // the priority level assigned by scheduler, 0 by default private long clientStateId; + private boolean isCallCoordinated; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -758,6 +759,7 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2, this.traceScope = traceScope; this.callerContext = callerContext; this.clientStateId = Long.MIN_VALUE; + this.isCallCoordinated = false; } @Override @@ -843,6 +845,14 @@ public void setClientStateId(long stateId) { this.clientStateId = stateId; } + public void markCallCoordinated(boolean flag) { + this.isCallCoordinated = flag; + } + + public boolean isCallCoordinated() { + return this.isCallCoordinated; + } + @InterfaceStability.Unstable public void deferResponse() { this.deferredResponse = true; @@ -2563,9 +2573,31 @@ private void processRpcRequest(RpcRequestHeaderProto header, // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); - if(alignmentContext != null) { - long stateId = alignmentContext.receiveRequestState(header); - call.setClientStateId(stateId); + if(alignmentContext != null && call.rpcRequest != null && + (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) { + // if call.rpcRequest is not RpcProtobufRequest, will skip the following + // step and treat the call as uncoordinated. As currently only certain + // ClientProtocol methods request made through RPC protobuf needs to be + // coordinated. + String methodName; + String protoName; + try { + ProtobufRpcEngine.RpcProtobufRequest req = + (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; + methodName = req.getRequestHeader().getMethodName(); + protoName = req.getRequestHeader().getDeclaringClassProtocolName(); + } catch (IOException ioe) { + throw new RpcServerException("Rpc request header check fail", ioe); + } + if (!alignmentContext.isCoordinatedCall(protoName, methodName)) { + call.markCallCoordinated(false); + } else { + call.markCallCoordinated(true); + long stateId = alignmentContext.receiveRequestState(header); + call.setClientStateId(stateId); + } + } else { + call.markCallCoordinated(false); } try { @@ -2749,8 +2781,8 @@ public void run() { TraceScope traceScope = null; try { final Call call = callQueue.take(); // pop the queue; maybe blocked here - if (alignmentContext != null && call.getClientStateId() > - alignmentContext.getLastSeenStateId()) { + if (alignmentContext != null && call.isCallCoordinated() && + call.getClientStateId() > alignmentContext.getLastSeenStateId()) { /* * The call processing should be postponed until the client call's * state id is aligned (>=) with the server state id. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 10fa0e15e4..6d366a63c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -44,6 +44,12 @@ public long getLastSeenStateId() { return lastSeenStateId.get(); } + @Override + public boolean isCoordinatedCall(String protocolName, String method) { + throw new UnsupportedOperationException( + "Client should not be checking uncoordinated call"); + } + /** * Client side implementation only receives state alignment info. * It does not provide state alignment info therefore this does nothing. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 20e4259985..4e5aa40999 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -129,7 +129,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly(atimeAffected = true) + @ReadOnly(atimeAffected = true, isCoordinated = true) LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException; @@ -139,7 +139,7 @@ LocatedBlocks getBlockLocations(String src, long offset, long length) * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) FsServerDefaults getServerDefaults() throws IOException; /** @@ -280,7 +280,7 @@ boolean setReplication(String src, short replication) * @return All the in-use block storage policies currently. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BlockStoragePolicy[] getStoragePolicies() throws IOException; /** @@ -323,7 +323,7 @@ void setStoragePolicy(String src, String policyName) * If file/dir src is not found */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BlockStoragePolicy getStoragePolicy(String path) throws IOException; /** @@ -690,7 +690,7 @@ boolean mkdirs(String src, FsPermission masked, boolean createParent) * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException; @@ -701,7 +701,7 @@ DirectoryListing getListing(String src, byte[] startAfter, * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException; @@ -829,7 +829,7 @@ DatanodeStorageReport[] getDatanodeStorageReport( * a symlink. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) long getPreferredBlockSize(String filename) throws IOException; @@ -984,7 +984,7 @@ RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) * cookie returned from the previous call. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException; @@ -1020,7 +1020,7 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsFileStatus getFileInfo(String src) throws IOException; /** @@ -1035,7 +1035,7 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) boolean isFileClosed(String src) throws IOException; /** @@ -1052,7 +1052,7 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsFileStatus getFileLinkInfo(String src) throws IOException; /** @@ -1067,7 +1067,7 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsLocatedFileStatus getLocatedFileInfo(String src, boolean needBlockToken) throws IOException; @@ -1082,7 +1082,7 @@ HdfsLocatedFileStatus getLocatedFileInfo(String src, boolean needBlockToken) * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ContentSummary getContentSummary(String path) throws IOException; /** @@ -1195,7 +1195,7 @@ void createSymlink(String target, String link, FsPermission dirPerm, * or an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) String getLinkTarget(String path) throws IOException; /** @@ -1265,7 +1265,7 @@ void cancelDelegationToken(Token token) * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) DataEncryptionKey getDataEncryptionKey() throws IOException; /** @@ -1334,7 +1334,7 @@ void disallowSnapshot(String snapshotRoot) * @throws IOException on error */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, String fromSnapshot, String toSnapshot) throws IOException; @@ -1362,7 +1362,7 @@ SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, * @throws IOException on error */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshotDiffReportListing getSnapshotDiffReportListing(String snapshotRoot, String fromSnapshot, String toSnapshot, byte[] startPath, int index) throws IOException; @@ -1409,7 +1409,7 @@ void modifyCacheDirective(CacheDirectiveInfo directive, * @return A batch of CacheDirectiveEntry objects. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listCacheDirectives( long prevId, CacheDirectiveInfo filter) throws IOException; @@ -1451,7 +1451,7 @@ BatchedEntries listCacheDirectives( * @return A batch of CachePoolEntry objects. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listCachePools(String prevPool) throws IOException; @@ -1498,7 +1498,7 @@ void removeAclEntries(String src, List aclSpec) * Gets the ACLs of files and directories. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) AclStatus getAclStatus(String src) throws IOException; /** @@ -1512,7 +1512,7 @@ void createEncryptionZone(String src, String keyName) * Get the encryption zone for a path. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) EncryptionZone getEZForPath(String src) throws IOException; @@ -1524,7 +1524,7 @@ EncryptionZone getEZForPath(String src) * @return Batch of encryption zones. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listEncryptionZones( long prevId) throws IOException; @@ -1549,7 +1549,7 @@ void reencryptEncryptionZone(String zone, ReencryptAction action) * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listReencryptionStatus(long prevId) throws IOException; @@ -1583,7 +1583,7 @@ void setXAttr(String src, XAttr xAttr, EnumSet flag) * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) List getXAttrs(String src, List xAttrs) throws IOException; @@ -1599,7 +1599,7 @@ List getXAttrs(String src, List xAttrs) * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) List listXAttrs(String src) throws IOException; @@ -1634,7 +1634,7 @@ List listXAttrs(String src) * @throws IOException see specific implementation */ @Idempotent - @ReadOnly + @ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call void checkAccess(String path, FsAction mode) throws IOException; /** @@ -1643,7 +1643,7 @@ List listXAttrs(String src) * the starting point for the inotify event stream. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) long getCurrentEditLogTxid() throws IOException; /** @@ -1651,7 +1651,7 @@ List listXAttrs(String src) * transactions for txids equal to or greater than txid. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) EventBatchList getEditsFromTxid(long txid) throws IOException; /** @@ -1709,7 +1709,7 @@ AddErasureCodingPolicyResponse[] addErasureCodingPolicies( * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ErasureCodingPolicyInfo[] getErasureCodingPolicies() throws IOException; /** @@ -1718,7 +1718,7 @@ AddErasureCodingPolicyResponse[] addErasureCodingPolicies( * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) Map getErasureCodingCodecs() throws IOException; /** @@ -1729,7 +1729,7 @@ AddErasureCodingPolicyResponse[] addErasureCodingPolicies( * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException; /** @@ -1769,7 +1769,7 @@ AddErasureCodingPolicyResponse[] addErasureCodingPolicies( */ @Idempotent @Deprecated - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listOpenFiles(long prevId) throws IOException; /** @@ -1784,7 +1784,7 @@ AddErasureCodingPolicyResponse[] addErasureCodingPolicies( * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listOpenFiles(long prevId, EnumSet openFilesTypes, String path) throws IOException; @@ -1796,7 +1796,7 @@ BatchedEntries listOpenFiles(long prevId, * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) void msync() throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java index 1782dcb6d8..1786ce1aef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java @@ -44,4 +44,11 @@ * is only available on the active namenode. */ boolean activeOnly() default false; + + /** + * @return if true, when processing the rpc call of the target method, the + * server side will wait if server state id is behind client (msync). If + * false, the method will be processed regardless of server side state. + */ + boolean isCoordinated() default false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index 0016692933..ecb9fd3624 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hdfs.server.namenode; +import java.lang.reflect.Method; +import java.util.HashSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -34,12 +38,23 @@ class GlobalStateIdContext implements AlignmentContext { private final FSNamesystem namesystem; + private final HashSet coordinatedMethods; + /** * Server side constructor. * @param namesystem server side state provider */ GlobalStateIdContext(FSNamesystem namesystem) { this.namesystem = namesystem; + this.coordinatedMethods = new HashSet<>(); + // For now, only ClientProtocol methods can be coordinated, so only checking + // against ClientProtocol. + for (Method method : ClientProtocol.class.getDeclaredMethods()) { + if (method.isAnnotationPresent(ReadOnly.class) && + method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) { + coordinatedMethods.add(method.getName()); + } + } } /** @@ -92,4 +107,10 @@ public long receiveRequestState(RpcRequestHeaderProto header) { public long getLastSeenStateId() { return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId(); } + + @Override + public boolean isCoordinatedCall(String protocolName, String methodName) { + return protocolName.equals(ClientProtocol.class.getCanonicalName()) + && coordinatedMethods.contains(methodName); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 16371b1030..89bfffb449 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.retry.FailoverProxyProvider; @@ -342,6 +343,57 @@ public void testMsyncSimple() throws Exception { assertEquals(1, readStatus.get()); } + @Test + public void testUncoordinatedCall() throws Exception { + // disable fast tailing so that coordination takes time. + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); + conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS); + conf.setTimeDuration( + DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS); + setUpCluster(1); + setObserverRead(true); + + // make a write call so that client will be ahead of + // observer for now. + dfs.mkdir(testPath, FsPermission.getDefault()); + + // a status flag, initialized to 0, after reader finished, this will be + // updated to 1, -1 on error + AtomicInteger readStatus = new AtomicInteger(0); + + // create a separate thread to make a blocking read. + Thread reader = new Thread(() -> { + try { + // this read call will block until server state catches up. But due to + // configuration, this will take a very long time. + dfs.getClient().getFileInfo("/"); + readStatus.set(1); + fail("Should have been interrupted before getting here."); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } + }); + reader.start(); + + long before = System.currentTimeMillis(); + dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL); + long after = System.currentTimeMillis(); + + // should succeed immediately, because datanodeReport is marked an + // uncoordinated call, and will not be waiting for server to catch up. + assertTrue(after - before < 200); + // by this time, reader thread should still be blocking, so the status not + // updated + assertEquals(0, readStatus.get()); + Thread.sleep(5000); + // reader thread status should still be unchanged after 5 sec... + assertEquals(0, readStatus.get()); + // and the reader thread is not dead, so it must be still waiting + assertEquals(Thread.State.WAITING, reader.getState()); + reader.interrupt(); + } + private void setUpCluster(int numObservers) throws Exception { qjmhaCluster = new MiniQJMHACluster.Builder(conf) .setNumNameNodes(2 + numObservers)