HDFS-16669: Enhance client protocol to propagate last seen state IDs for multiple nameservices.
Fixes #4584 Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
This commit is contained in:
parent
6fbc38db95
commit
a3b1bafa34
@ -46,7 +46,7 @@ public interface AlignmentContext {
|
||||
void updateResponseState(RpcResponseHeaderProto.Builder header);
|
||||
|
||||
/**
|
||||
* This is the intended client method call to implement to recieve state info
|
||||
* This is the intended client method call to implement to receive state info
|
||||
* during RPC response processing.
|
||||
*
|
||||
* @param header The RPC response header.
|
||||
|
@ -925,7 +925,7 @@ public static class Call implements Schedulable,
|
||||
private volatile String detailedMetricsName = "";
|
||||
final int callId; // the client's call id
|
||||
final int retryCount; // the retry count of the call
|
||||
long timestampNanos; // time the call was received
|
||||
private final long timestampNanos; // time the call was received
|
||||
long responseTimestampNanos; // time the call was served
|
||||
private AtomicInteger responseWaitCount = new AtomicInteger(1);
|
||||
final RPC.RpcKind rpcKind;
|
||||
@ -1107,6 +1107,10 @@ public void setDeferredResponse(Writable response) {
|
||||
|
||||
public void setDeferredError(Throwable t) {
|
||||
}
|
||||
|
||||
public long getTimestampNanos() {
|
||||
return timestampNanos;
|
||||
}
|
||||
}
|
||||
|
||||
/** A RPC extended call queued for handling. */
|
||||
@ -1188,7 +1192,7 @@ public Void run() throws Exception {
|
||||
|
||||
try {
|
||||
value = call(
|
||||
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
|
||||
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
|
||||
} catch (Throwable e) {
|
||||
populateResponseParamsOnError(e, responseParams);
|
||||
}
|
||||
|
@ -91,6 +91,10 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
|
||||
optional RPCTraceInfoProto traceInfo = 6; // tracing info
|
||||
optional RPCCallerContextProto callerContext = 7; // call context
|
||||
optional int64 stateId = 8; // The last seen Global State ID
|
||||
// Alignment context info for use with routers.
|
||||
// The client should not interpret these bytes, but only forward bytes
|
||||
// received from RpcResponseHeaderProto.routerFederatedState.
|
||||
optional bytes routerFederatedState = 9;
|
||||
}
|
||||
|
||||
|
||||
@ -157,6 +161,10 @@ message RpcResponseHeaderProto {
|
||||
optional bytes clientId = 7; // Globally unique client ID
|
||||
optional sint32 retryCount = 8 [default = -1];
|
||||
optional int64 stateId = 9; // The last written Global State ID
|
||||
// Alignment context info for use with routers.
|
||||
// The client should not interpret these bytes, but only
|
||||
// forward them to the router using RpcRequestHeaderProto.routerFederatedState.
|
||||
optional bytes routerFederatedState = 10;
|
||||
}
|
||||
|
||||
message RpcSaslProto {
|
||||
|
@ -312,3 +312,16 @@ message GetDisabledNameservicesRequestProto {
|
||||
message GetDisabledNameservicesResponseProto {
|
||||
repeated string nameServiceIds = 1;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////
|
||||
// Alignment state for namespaces.
|
||||
/////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Clients should receive this message in RPC responses and forward it
|
||||
* in RPC requests without interpreting it. It should be encoded
|
||||
* as an obscure byte array when being sent to clients.
|
||||
*/
|
||||
message RouterFederatedStateProto {
|
||||
map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
|
||||
}
|
@ -0,0 +1,99 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.ipc.AlignmentContext;
|
||||
import org.apache.hadoop.ipc.ClientId;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcConstants;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
|
||||
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
|
||||
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hadoop.util.ProtoUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
||||
public class TestRouterFederatedState {
|
||||
|
||||
@Test
|
||||
public void testRpcRouterFederatedState() throws InvalidProtocolBufferException {
|
||||
byte[] uuid = ClientId.getClientId();
|
||||
Map<String, Long> expectedStateIds = new HashMap<String, Long>() {{
|
||||
put("namespace1", 11L );
|
||||
put("namespace2", 22L);
|
||||
}};
|
||||
|
||||
AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds);
|
||||
|
||||
RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
||||
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0,
|
||||
RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext);
|
||||
|
||||
Map<String, Long> stateIdsFromHeader =
|
||||
RouterFederatedStateProto.parseFrom(
|
||||
header.getRouterFederatedState().toByteArray()
|
||||
).getNamespaceStateIdsMap();
|
||||
|
||||
assertEquals(expectedStateIds, stateIdsFromHeader);
|
||||
}
|
||||
|
||||
private static class AlignmentContextWithRouterState implements AlignmentContext {
|
||||
|
||||
Map<String, Long> routerFederatedState;
|
||||
|
||||
public AlignmentContextWithRouterState(Map<String, Long> namespaceStates) {
|
||||
this.routerFederatedState = namespaceStates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
|
||||
RouterFederatedStateProto fedState = RouterFederatedStateProto
|
||||
.newBuilder()
|
||||
.putAllNamespaceStateIds(routerFederatedState)
|
||||
.build();
|
||||
|
||||
header.setRouterFederatedState(fedState.toByteString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {}
|
||||
|
||||
@Override
|
||||
public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {}
|
||||
|
||||
@Override
|
||||
public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastSeenStateId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCoordinatedCall(String protocolName, String method) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user