YARN-5162. Fix Exceptions thrown during in registerAM call when Distributed Scheduling is Enabled (Hitesh Sharma via asuresh)

This commit is contained in:
Arun Suresh 2016-05-26 13:22:33 -07:00
parent 04ded558b0
commit 5b41b288d0
5 changed files with 32 additions and 7 deletions

View File

@ -41,7 +41,7 @@ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
@Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol.equals(ApplicationMasterProtocolPB.class)) {
if (!ApplicationMasterProtocolPB.class.isAssignableFrom(protocol)) {
return null;
}
return new TokenInfo() {

View File

@ -21,16 +21,16 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol.DistributedSchedulerProtocolService;
@Private
@Unstable
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
protocolVersion = 1)
public interface DistributedSchedulerProtocolPB extends
DistributedSchedulerProtocol.DistributedSchedulerProtocolService.BlockingInterface,
ApplicationMasterProtocolService.BlockingInterface {
DistributedSchedulerProtocolService.BlockingInterface,
ApplicationMasterProtocolService.BlockingInterface,
ApplicationMasterProtocolPB {
}

View File

@ -94,9 +94,13 @@ private synchronized void mergeLocalToBuilder() {
ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
}
if (this.minAllocatableCapability != null) {
builder.setMaxAllocCapability(
builder.setMinAllocCapability(
ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
}
if (this.incrAllocatableCapability != null) {
builder.setIncrAllocCapability(
ProtoUtils.convertToProtoFormat(this.incrAllocatableCapability));
}
if (this.registerApplicationMasterResponse != null) {
builder.setRegisterResponse(
((RegisterApplicationMasterResponsePBImpl)

View File

@ -34,5 +34,6 @@ import "yarn_server_common_service_protos.proto";
service DistributedSchedulerProtocolService {
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
}

View File

@ -35,6 +35,7 @@
.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -132,6 +133,9 @@ public AllocateResponse allocate(AllocateRequest request) throws
DistSchedRegisterResponse resp = factory.newRecordInstance(
DistSchedRegisterResponse.class);
resp.setContainerIdStart(54321l);
resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
return resp;
}
@ -194,6 +198,13 @@ public AllocateResponse allocate(AllocateRequest request) throws
.newRecordInstance(RegisterApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
Assert.assertEquals(4,
dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
Assert.assertEquals(1024,
dsRegResp.getMinAllocatableCapabilty().getMemory());
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
@ -201,5 +212,14 @@ public AllocateResponse allocate(AllocateRequest request) throws
.newRecordInstance(AllocateRequest.class)).getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
FinishApplicationMasterResponse dsfinishResp =
new FinishApplicationMasterResponsePBImpl(
dsProxy.finishApplicationMaster(null,
((FinishApplicationMasterRequestPBImpl) factory
.newRecordInstance(FinishApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(
false, dsfinishResp.getIsUnregistered());
}
}