HDFS-16669: Enhance client protocol to propagate last seen state IDs for multiple nameservices.
Fixes #4584 Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
This commit is contained in:
parent
4138661010
commit
e28dc524f6
@ -46,7 +46,7 @@ public interface AlignmentContext {
|
|||||||
void updateResponseState(RpcResponseHeaderProto.Builder header);
|
void updateResponseState(RpcResponseHeaderProto.Builder header);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the intended client method call to implement to recieve state info
|
* This is the intended client method call to implement to receive state info
|
||||||
* during RPC response processing.
|
* during RPC response processing.
|
||||||
*
|
*
|
||||||
* @param header The RPC response header.
|
* @param header The RPC response header.
|
||||||
|
@ -820,7 +820,7 @@ public static class Call implements Schedulable,
|
|||||||
private volatile String detailedMetricsName = "";
|
private volatile String detailedMetricsName = "";
|
||||||
final int callId; // the client's call id
|
final int callId; // the client's call id
|
||||||
final int retryCount; // the retry count of the call
|
final int retryCount; // the retry count of the call
|
||||||
long timestampNanos; // time the call was received
|
private final long timestampNanos; // time the call was received
|
||||||
long responseTimestampNanos; // time the call was served
|
long responseTimestampNanos; // time the call was served
|
||||||
private AtomicInteger responseWaitCount = new AtomicInteger(1);
|
private AtomicInteger responseWaitCount = new AtomicInteger(1);
|
||||||
final RPC.RpcKind rpcKind;
|
final RPC.RpcKind rpcKind;
|
||||||
@ -1002,6 +1002,10 @@ public void setDeferredResponse(Writable response) {
|
|||||||
|
|
||||||
public void setDeferredError(Throwable t) {
|
public void setDeferredError(Throwable t) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getTimestampNanos() {
|
||||||
|
return timestampNanos;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A RPC extended call queued for handling. */
|
/** A RPC extended call queued for handling. */
|
||||||
@ -1083,7 +1087,7 @@ public Void run() throws Exception {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
value = call(
|
value = call(
|
||||||
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
|
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
populateResponseParamsOnError(e, responseParams);
|
populateResponseParamsOnError(e, responseParams);
|
||||||
}
|
}
|
||||||
|
@ -91,6 +91,10 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
|
|||||||
optional RPCTraceInfoProto traceInfo = 6; // tracing info
|
optional RPCTraceInfoProto traceInfo = 6; // tracing info
|
||||||
optional RPCCallerContextProto callerContext = 7; // call context
|
optional RPCCallerContextProto callerContext = 7; // call context
|
||||||
optional int64 stateId = 8; // The last seen Global State ID
|
optional int64 stateId = 8; // The last seen Global State ID
|
||||||
|
// Alignment context info for use with routers.
|
||||||
|
// The client should not interpret these bytes, but only forward bytes
|
||||||
|
// received from RpcResponseHeaderProto.routerFederatedState.
|
||||||
|
optional bytes routerFederatedState = 9;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -157,6 +161,10 @@ message RpcResponseHeaderProto {
|
|||||||
optional bytes clientId = 7; // Globally unique client ID
|
optional bytes clientId = 7; // Globally unique client ID
|
||||||
optional sint32 retryCount = 8 [default = -1];
|
optional sint32 retryCount = 8 [default = -1];
|
||||||
optional int64 stateId = 9; // The last written Global State ID
|
optional int64 stateId = 9; // The last written Global State ID
|
||||||
|
// Alignment context info for use with routers.
|
||||||
|
// The client should not interpret these bytes, but only
|
||||||
|
// forward them to the router using RpcRequestHeaderProto.routerFederatedState.
|
||||||
|
optional bytes routerFederatedState = 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RpcSaslProto {
|
message RpcSaslProto {
|
||||||
|
@ -306,3 +306,16 @@ message GetDisabledNameservicesRequestProto {
|
|||||||
message GetDisabledNameservicesResponseProto {
|
message GetDisabledNameservicesResponseProto {
|
||||||
repeated string nameServiceIds = 1;
|
repeated string nameServiceIds = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////
|
||||||
|
// Alignment state for namespaces.
|
||||||
|
/////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clients should receive this message in RPC responses and forward it
|
||||||
|
* in RPC requests without interpreting it. It should be encoded
|
||||||
|
* as an obscure byte array when being sent to clients.
|
||||||
|
*/
|
||||||
|
message RouterFederatedStateProto {
|
||||||
|
map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
|
||||||
|
}
|
@ -0,0 +1,99 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.hadoop.ipc.AlignmentContext;
|
||||||
|
import org.apache.hadoop.ipc.ClientId;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.RpcConstants;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
|
||||||
|
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
|
||||||
|
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
|
||||||
|
import org.apache.hadoop.util.ProtoUtil;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestRouterFederatedState {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRpcRouterFederatedState() throws InvalidProtocolBufferException {
|
||||||
|
byte[] uuid = ClientId.getClientId();
|
||||||
|
Map<String, Long> expectedStateIds = new HashMap<String, Long>() {{
|
||||||
|
put("namespace1", 11L );
|
||||||
|
put("namespace2", 22L);
|
||||||
|
}};
|
||||||
|
|
||||||
|
AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds);
|
||||||
|
|
||||||
|
RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
||||||
|
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0,
|
||||||
|
RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext);
|
||||||
|
|
||||||
|
Map<String, Long> stateIdsFromHeader =
|
||||||
|
RouterFederatedStateProto.parseFrom(
|
||||||
|
header.getRouterFederatedState().toByteArray()
|
||||||
|
).getNamespaceStateIdsMap();
|
||||||
|
|
||||||
|
assertEquals(expectedStateIds, stateIdsFromHeader);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class AlignmentContextWithRouterState implements AlignmentContext {
|
||||||
|
|
||||||
|
Map<String, Long> routerFederatedState;
|
||||||
|
|
||||||
|
public AlignmentContextWithRouterState(Map<String, Long> namespaceStates) {
|
||||||
|
this.routerFederatedState = namespaceStates;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
|
||||||
|
RouterFederatedStateProto fedState = RouterFederatedStateProto
|
||||||
|
.newBuilder()
|
||||||
|
.putAllNamespaceStateIds(routerFederatedState)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
header.setRouterFederatedState(fedState.toByteString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLastSeenStateId() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCoordinatedCall(String protocolName, String method) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user