diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index bcbb4b96c2..7b03e1f351 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -20,13 +20,19 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.thirdparty.protobuf.ByteString; +import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; /** * Global State Id context for the client. @@ -77,12 +83,46 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) { @Override public synchronized void receiveResponseState(RpcResponseHeaderProto header) { if (header.hasRouterFederatedState()) { - routerFederatedState = header.getRouterFederatedState(); + routerFederatedState = mergeRouterFederatedState( + this.routerFederatedState, header.getRouterFederatedState()); } else { lastSeenStateId.accumulate(header.getStateId()); } } + /** + * Utility function to parse routerFederatedState field in RPC headers. + */ + public static Map getRouterFederatedStateMap(ByteString byteString) { + if (byteString != null) { + try { + RouterFederatedStateProto federatedState = RouterFederatedStateProto.parseFrom(byteString); + return federatedState.getNamespaceStateIdsMap(); + } catch (InvalidProtocolBufferException e) { + // Ignore this exception and will return an empty map + } + } + return Collections.emptyMap(); + } + + /** + * Merge state1 and state2 to get the max value for each namespace. + * @param state1 input ByteString. + * @param state2 input ByteString. + * @return one ByteString object which contains the max value of each namespace. + */ + public static ByteString mergeRouterFederatedState(ByteString state1, ByteString state2) { + Map mapping1 = new HashMap<>(getRouterFederatedStateMap(state1)); + Map mapping2 = getRouterFederatedStateMap(state2); + mapping2.forEach((k, v) -> { + long localValue = mapping1.getOrDefault(k, 0L); + mapping1.put(k, Math.max(v, localValue)); + }); + RouterFederatedStateProto.Builder federatedBuilder = RouterFederatedStateProto.newBuilder(); + mapping1.forEach(federatedBuilder::putNamespaceStateIds); + return federatedBuilder.build().toByteString(); + } + /** * Client side implementation for providing state alignment info in requests. */ @@ -106,4 +146,9 @@ public long receiveRequestState(RpcRequestHeaderProto header, long threshold) // Do nothing. return 0; } + + @VisibleForTesting + public ByteString getRouterFederatedState() { + return this.routerFederatedState; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index a4d36180c2..e1e7f7d780 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -723,3 +723,15 @@ message BlockTokenSecretProto { repeated string storageIds = 8; optional bytes handshakeSecret = 9; } + +///////////////////////////////////////////////// +// Alignment state for namespaces. +///////////////////////////////////////////////// +/** + * Clients should receive this message in RPC responses and forward it + * in RPC requests without interpreting it. It should be encoded + * as an obscure byte array when being sent to clients. + */ +message RouterFederatedStateProto { + map namespaceStateIds = 1; // Last seen state IDs for multiple namespaces. +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index 9d2b75b0b5..a15a0001e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -28,8 +28,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.RetriableException; @@ -83,10 +83,9 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) if (namespaceIdMap.isEmpty()) { return; } - HdfsServerFederationProtos.RouterFederatedStateProto.Builder federatedStateBuilder = - HdfsServerFederationProtos.RouterFederatedStateProto.newBuilder(); - namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get())); - headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString()); + RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder(); + namespaceIdMap.forEach((k, v) -> builder.putNamespaceStateIds(k, v.get())); + headerBuilder.setRouterFederatedState(builder.build().toByteString()); } public LongAccumulator getNamespaceStateId(String nsId) { @@ -102,9 +101,9 @@ public void removeNamespaceStateId(String nsId) { */ public static Map getRouterFederatedStateMap(ByteString byteString) { if (byteString != null) { - HdfsServerFederationProtos.RouterFederatedStateProto federatedState; + RouterFederatedStateProto federatedState; try { - federatedState = HdfsServerFederationProtos.RouterFederatedStateProto.parseFrom(byteString); + federatedState = RouterFederatedStateProto.parseFrom(byteString); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto index 7f61d80fe1..c8636826c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto @@ -311,17 +311,4 @@ message GetDisabledNameservicesRequestProto { message GetDisabledNameservicesResponseProto { repeated string nameServiceIds = 1; -} - -///////////////////////////////////////////////// -// Alignment state for namespaces. -///////////////////////////////////////////////// - -/** - * Clients should receive this message in RPC responses and forward it - * in RPC requests without interpreting it. It should be encoded - * as an obscure byte array when being sent to clients. - */ -message RouterFederatedStateProto { - map namespaceStateIds = 1; // Last seen state IDs for multiple namespaces. } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 42288bcf53..920c9c4e51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hdfs.server.federation.router; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index e38b0b2a35..23e72546aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -27,14 +27,18 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; @@ -43,6 +47,8 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -505,4 +511,38 @@ public void testSingleReadUsingObserverReadProxyProvider() throws Exception { // getList call should be sent to observer assertEquals("One call should be sent to observer", 1, rpcCountForObserver); } -} \ No newline at end of file + + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testClientReceiveResponseState() { + ClientGSIContext clientGSIContext = new ClientGSIContext(); + + Map mockMapping = new HashMap<>(); + mockMapping.put("ns0", 10L); + RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder(); + mockMapping.forEach(builder::putNamespaceStateIds); + RpcHeaderProtos.RpcResponseHeaderProto header = RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) + .setRouterFederatedState(builder.build().toByteString()) + .build(); + clientGSIContext.receiveResponseState(header); + + Map mockLowerMapping = new HashMap<>(); + mockLowerMapping.put("ns0", 8L); + builder = RouterFederatedStateProto.newBuilder(); + mockLowerMapping.forEach(builder::putNamespaceStateIds); + header = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder() + .setRouterFederatedState(builder.build().toByteString()) + .setCallId(2) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) + .build(); + clientGSIContext.receiveResponseState(header); + + Map latestFederateState = ClientGSIContext.getRouterFederatedStateMap( + clientGSIContext.getRouterFederatedState()); + Assertions.assertEquals(1, latestFederateState.size()); + Assertions.assertEquals(10L, latestFederateState.get("ns0")); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java index 2bc8cfc21b..be8fcf682b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java @@ -19,12 +19,13 @@ import java.util.HashMap; import java.util.Map; + +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.ClientId; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; -import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto; import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.util.ProtoUtil; import org.junit.Test;