YARN-1449. AM-NM protocol changes to support container resizing. Contributed by Meng Ding & Wangda Tan)

This commit is contained in:
Jian He 2015-07-14 16:06:25 -07:00 committed by Wangda Tan
parent dfe2cb849f
commit 83a18add10
19 changed files with 910 additions and 4 deletions

View File

@ -30,6 +30,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@ -449,5 +451,14 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
"Dummy function cause"));
throw new IOException(e);
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws IOException,
IOException {
Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw new IOException(e);
}
}
}

View File

@ -46,6 +46,8 @@
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@ -453,6 +455,13 @@ public GetContainerStatusesResponse getContainerStatuses(
return null;
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException {
return null;
}
@Override
public void close() throws IOException {
}

View File

@ -203,6 +203,9 @@ Release 2.8.0 - UNRELEASED
YARN-3866. AM-RM protocol changes to support container resizing. (Meng Ding
via jianhe)
YARN-1449. AM-NM protocol changes to support container resizing.
(Meng Ding & Wangda Tan via jianhe)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -22,6 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -38,9 +41,9 @@
/**
* <p>The protocol between an <code>ApplicationMaster</code> and a
* <code>NodeManager</code> to start/stop containers and to get status
* of running containers.</p>
*
* <code>NodeManager</code> to start/stop and increase resource of containers
* and to get status of running containers.</p>
*
* <p>If security is enabled the <code>NodeManager</code> verifies that the
* <code>ApplicationMaster</code> has truly been allocated the container
* by the <code>ResourceManager</code> and also verifies all interactions such
@ -170,4 +173,25 @@ StopContainersResponse stopContainers(StopContainersRequest request)
GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws YarnException,
IOException;
/**
* <p>
* The API used by the <code>ApplicationMaster</code> to request for
* resource increase of running containers on the <code>NodeManager</code>.
* </p>
*
* @param request
* request to increase resource of a list of containers
* @return response which includes a list of containerIds of containers
* whose resource has been successfully increased and a
* containerId-to-exception map for failed requests.
*
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException;
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The request sent by <code>Application Master</code> to the
* <code>Node Manager</code> to change the resource quota of a container.</p>
*
* @see ContainerManagementProtocol#increaseContainersResource(IncreaseContainersResourceRequest)
*/
@Public
@Unstable
public abstract class IncreaseContainersResourceRequest {
@Public
@Unstable
public static IncreaseContainersResourceRequest newInstance(
List<Token> containersToIncrease) {
IncreaseContainersResourceRequest request =
Records.newRecord(IncreaseContainersResourceRequest.class);
request.setContainersToIncrease(containersToIncrease);
return request;
}
/**
* Get a list of container tokens to be used for authorization during
* container resource increase.
* <p>
* Note: {@link NMToken} will be used for authenticating communication with
* {@code NodeManager}.
* @return the list of container tokens to be used for authorization during
* container resource increase.
* @see NMToken
*/
@Public
@Unstable
public abstract List<Token> getContainersToIncrease();
/**
* Set container tokens to be used during container resource increase.
* The token is acquired from
* <code>AllocateResponse.getIncreasedContainers</code>.
* The token contains the container id and resource capability required for
* container resource increase.
* @param containersToIncrease the list of container tokens to be used
* for container resource increase.
*/
@Public
@Unstable
public abstract void setContainersToIncrease(
List<Token> containersToIncrease);
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
import java.util.Map;
/**
* <p>
* The response sent by the <code>NodeManager</code> to the
* <code>ApplicationMaster</code> when asked to increase container resource.
* </p>
*
* @see ContainerManagementProtocol#increaseContainersResource(IncreaseContainersResourceRequest)
*/
@Public
@Unstable
public abstract class IncreaseContainersResourceResponse {
@Private
@Unstable
public static IncreaseContainersResourceResponse newInstance(
List<ContainerId> successfullyIncreasedContainers,
Map<ContainerId, SerializedException> failedRequests) {
IncreaseContainersResourceResponse response =
Records.newRecord(IncreaseContainersResourceResponse.class);
response.setSuccessfullyIncreasedContainers(
successfullyIncreasedContainers);
response.setFailedRequests(failedRequests);
return response;
}
/**
* Get the list of containerIds of containers whose resource
* have been successfully increased.
*
* @return the list of containerIds of containers whose resource have
* been successfully increased.
*/
@Public
@Unstable
public abstract List<ContainerId> getSuccessfullyIncreasedContainers();
/**
* Set the list of containerIds of containers whose resource have
* been successfully increased.
*/
@Private
@Unstable
public abstract void setSuccessfullyIncreasedContainers(
List<ContainerId> succeedIncreasedContainers);
/**
* Get the containerId-to-exception map in which the exception indicates
* error from each container for failed requests.
*/
@Public
@Unstable
public abstract Map<ContainerId, SerializedException> getFailedRequests();
/**
* Set the containerId-to-exception map in which the exception indicates
* error from each container for failed requests.
*/
@Private
@Unstable
public abstract void setFailedRequests(
Map<ContainerId, SerializedException> failedRequests);
}

View File

@ -34,4 +34,5 @@ service ContainerManagementProtocolService {
rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto);
rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto);
rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto);
rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto);
}

View File

@ -30,12 +30,16 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@ -48,6 +52,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
import com.google.protobuf.ServiceException;
@ -128,4 +133,19 @@ public GetContainerStatusesResponse getContainerStatuses(
return null;
}
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException {
IncreaseContainersResourceRequestProto requestProto =
((IncreaseContainersResourceRequestPBImpl)request).getProto();
try {
return new IncreaseContainersResourceResponsePBImpl(
proxy.increaseContainersResource(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -23,9 +23,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@ -33,6 +36,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
@ -94,4 +99,21 @@ public GetContainerStatusesResponseProto getContainerStatuses(
throw new ServiceException(e);
}
}
@Override
public IncreaseContainersResourceResponseProto increaseContainersResource(
RpcController controller, IncreaseContainersResourceRequestProto proto)
throws ServiceException {
IncreaseContainersResourceRequestPBImpl request =
new IncreaseContainersResourceRequestPBImpl(proto);
try {
IncreaseContainersResourceResponse response =
real.increaseContainersResource(request);
return ((IncreaseContainersResourceResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProtoOrBuilder;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class IncreaseContainersResourceRequestPBImpl extends
IncreaseContainersResourceRequest {
IncreaseContainersResourceRequestProto proto =
IncreaseContainersResourceRequestProto.getDefaultInstance();
IncreaseContainersResourceRequestProto.Builder builder = null;
boolean viaProto = false;
private List<Token> containersToIncrease = null;
public IncreaseContainersResourceRequestPBImpl() {
builder = IncreaseContainersResourceRequestProto.newBuilder();
}
public IncreaseContainersResourceRequestPBImpl(
IncreaseContainersResourceRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public IncreaseContainersResourceRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToBuilder() {
if (this.containersToIncrease != null) {
addIncreaseContainersToProto();
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = IncreaseContainersResourceRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public List<Token> getContainersToIncrease() {
if (containersToIncrease != null) {
return containersToIncrease;
}
IncreaseContainersResourceRequestProtoOrBuilder p =
viaProto ? proto : builder;
List<TokenProto> list = p.getIncreaseContainersList();
containersToIncrease = new ArrayList<>();
for (TokenProto c : list) {
containersToIncrease.add(convertFromProtoFormat(c));
}
return containersToIncrease;
}
@Override
public void setContainersToIncrease(List<Token> containersToIncrease) {
maybeInitBuilder();
if (containersToIncrease == null) {
builder.clearIncreaseContainers();
}
this.containersToIncrease = containersToIncrease;
}
private void addIncreaseContainersToProto() {
maybeInitBuilder();
builder.clearIncreaseContainers();
if (this.containersToIncrease == null) {
return;
}
Iterable<TokenProto> iterable = new Iterable<TokenProto>() {
@Override
public Iterator<TokenProto> iterator() {
return new Iterator<TokenProto>() {
Iterator<Token> iter = containersToIncrease.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public TokenProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllIncreaseContainers(iterable);
}
private Token convertFromProtoFormat(TokenProto p) {
return new TokenPBImpl(p);
}
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl) t).getProto();
}
}

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerExceptionMapProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class IncreaseContainersResourceResponsePBImpl extends
IncreaseContainersResourceResponse {
IncreaseContainersResourceResponseProto proto =
IncreaseContainersResourceResponseProto.getDefaultInstance();
IncreaseContainersResourceResponseProto.Builder builder = null;
boolean viaProto = false;
private List<ContainerId> succeededRequests = null;
private Map<ContainerId, SerializedException> failedRequests = null;
public IncreaseContainersResourceResponsePBImpl() {
builder = IncreaseContainersResourceResponseProto.newBuilder();
}
public IncreaseContainersResourceResponsePBImpl(
IncreaseContainersResourceResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public IncreaseContainersResourceResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToBuilder() {
if (this.succeededRequests != null) {
addSucceededRequestsToProto();
}
if (this.failedRequests != null) {
addFailedRequestsToProto();
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = IncreaseContainersResourceResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public List<ContainerId> getSuccessfullyIncreasedContainers() {
initSucceededRequests();
return this.succeededRequests;
}
@Override
public void setSuccessfullyIncreasedContainers(
List<ContainerId> succeededRequests) {
maybeInitBuilder();
if (succeededRequests == null) {
builder.clearSucceededRequests();
}
this.succeededRequests = succeededRequests;
}
private void initSucceededRequests() {
if (this.succeededRequests != null) {
return;
}
IncreaseContainersResourceResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<ContainerIdProto> list = p.getSucceededRequestsList();
this.succeededRequests = new ArrayList<ContainerId>();
for (ContainerIdProto c : list) {
this.succeededRequests.add(convertFromProtoFormat(c));
}
}
private void addSucceededRequestsToProto() {
maybeInitBuilder();
builder.clearSucceededRequests();
if (this.succeededRequests == null) {
return;
}
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
@Override
public Iterator<ContainerIdProto> iterator() {
return new Iterator<ContainerIdProto>() {
Iterator<ContainerId> iter = succeededRequests.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllSucceededRequests(iterable);
}
@Override
public Map<ContainerId, SerializedException> getFailedRequests() {
initFailedRequests();
return this.failedRequests;
}
@Override
public void setFailedRequests(
Map<ContainerId, SerializedException> failedRequests) {
maybeInitBuilder();
if (failedRequests == null) {
builder.clearFailedRequests();
}
this.failedRequests = failedRequests;
}
private void initFailedRequests() {
if (this.failedRequests != null) {
return;
}
IncreaseContainersResourceResponseProtoOrBuilder
p = viaProto ? proto : builder;
List<ContainerExceptionMapProto> protoList = p.getFailedRequestsList();
this.failedRequests = new HashMap<ContainerId, SerializedException>();
for (ContainerExceptionMapProto ce : protoList) {
this.failedRequests.put(convertFromProtoFormat(ce.getContainerId()),
convertFromProtoFormat(ce.getException()));
}
}
private void addFailedRequestsToProto() {
maybeInitBuilder();
builder.clearFailedRequests();
if (this.failedRequests == null) {
return;
}
List<ContainerExceptionMapProto> protoList =
new ArrayList<ContainerExceptionMapProto>();
for (Map.Entry<ContainerId, SerializedException> entry : this.failedRequests
.entrySet()) {
protoList.add(ContainerExceptionMapProto.newBuilder()
.setContainerId(convertToProtoFormat(entry.getKey()))
.setException(convertToProtoFormat(entry.getValue())).build());
}
builder.addAllFailedRequests(protoList);
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl) t).getProto();
}
private SerializedExceptionPBImpl convertFromProtoFormat(
SerializedExceptionProto p) {
return new SerializedExceptionPBImpl(p);
}
private SerializedExceptionProto convertToProtoFormat(SerializedException t) {
return ((SerializedExceptionPBImpl) t).getProto();
}
}

View File

@ -31,6 +31,8 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -166,5 +168,11 @@ public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesResponse.newInstance(list, null);
return null;
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
}
}
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
/*
* Test that the container resource increase rpc times out properly.
* This is used by AM to increase container resource.
*/
public class TestContainerResourceIncreaseRPC {
static final Log LOG = LogFactory.getLog(
TestContainerResourceIncreaseRPC.class);
@Test
public void testHadoopProtoRPCTimeout() throws Exception {
testRPCTimeout(HadoopYarnProtoRPC.class.getName());
}
private void testRPCTimeout(String rpcClass) throws Exception {
Configuration conf = new Configuration();
// set timeout low for the test
conf.setInt("yarn.rpc.nm-command-timeout", 3000);
conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
Server server = rpc.getServer(ContainerManagementProtocol.class,
new DummyContainerManager(), addr, conf, null, 1);
server.start();
try {
ContainerManagementProtocol proxy =
(ContainerManagementProtocol) rpc.getProxy(
ContainerManagementProtocol.class,
server.getListenerAddress(), conf);
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 0);
ContainerId containerId =
ContainerId.newContainerId(applicationAttemptId, 100);
NodeId nodeId = NodeId.newInstance("localhost", 1234);
Resource resource = Resource.newInstance(1234, 2);
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(containerId, "localhost", "user",
resource, System.currentTimeMillis() + 10000, 42, 42,
Priority.newInstance(0), 0);
Token containerToken =
TestRPC.newContainerToken(nodeId, "password".getBytes(),
containerTokenIdentifier);
// Construct container resource increase request,
List<Token> increaseTokens = new ArrayList<>();
increaseTokens.add(containerToken);
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest
.newInstance(increaseTokens);
try {
proxy.increaseContainersResource(increaseRequest);
} catch (Exception e) {
LOG.info(StringUtils.stringifyException(e));
Assert.assertEquals("Error, exception is not: "
+ SocketTimeoutException.class.getName(),
SocketTimeoutException.class.getName(), e.getClass().getName());
return;
}
} finally {
server.stop();
}
Assert.fail("timeout exception should have occurred!");
}
public class DummyContainerManager implements ContainerManagementProtocol {
@Override
public StartContainersResponse startContainers(
StartContainersRequest requests) throws YarnException, IOException {
Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw new YarnException(e);
}
@Override
public StopContainersResponse
stopContainers(StopContainersRequest requests) throws YarnException,
IOException {
Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw new YarnException(e);
}
@Override
public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws YarnException, IOException {
Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw new YarnException(e);
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
try {
// make the thread sleep to look like its not going to respond
Thread.sleep(10000);
} catch (Exception e) {
LOG.error(e);
throw new YarnException(e);
}
throw new YarnException("Shouldn't happen!!");
}
}
}

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@ -219,6 +221,12 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
new Exception(EXCEPTION_CAUSE));
throw new YarnException(e);
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
}
}
public static ContainerTokenIdentifier newContainerTokenIdentifier(

View File

@ -44,6 +44,8 @@
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
@ -101,6 +103,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -278,6 +282,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto;
@ -466,6 +472,8 @@ public static void setup() throws Exception {
generateByNewInstance(ApplicationSubmissionContext.class);
generateByNewInstance(ContainerReport.class);
generateByNewInstance(ContainerResourceChangeRequest.class);
generateByNewInstance(IncreaseContainersResourceRequest.class);
generateByNewInstance(IncreaseContainersResourceResponse.class);
generateByNewInstance(ContainerStatus.class);
generateByNewInstance(PreemptionContainer.class);
generateByNewInstance(PreemptionResourceRequest.class);
@ -870,6 +878,18 @@ public void testStopContainersResponsePBImpl() throws Exception {
StopContainersResponseProto.class);
}
@Test
public void testIncreaseContainersResourceRequestPBImpl() throws Exception {
validatePBImplRecord(IncreaseContainersResourceRequestPBImpl.class,
IncreaseContainersResourceRequestProto.class);
}
@Test
public void testIncreaseContainersResourceResponsePBImpl() throws Exception {
validatePBImplRecord(IncreaseContainersResourceResponsePBImpl.class,
IncreaseContainersResourceResponseProto.class);
}
@Test
public void testSubmitApplicationRequestPBImpl() throws Exception {
validatePBImplRecord(SubmitApplicationRequestPBImpl.class,

View File

@ -58,6 +58,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -952,6 +954,17 @@ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
return containerTokenIdentifier;
}
/**
* Increase resource of a list of containers on this NodeManager.
*/
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest requests)
throws YarnException, IOException {
// To be implemented in YARN-1645
return null;
}
@Private
@VisibleForTesting
protected void updateNMTokenIdentifier(NMTokenIdentifier nmTokenIdentifier)

View File

@ -25,6 +25,8 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@ -295,7 +297,14 @@ synchronized public GetContainerStatusesResponse getContainerStatuses(
return GetContainerStatusesResponse.newInstance(statuses, null);
}
public static org.apache.hadoop.yarn.server.api.records.NodeStatus
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request)
throws YarnException, IOException {
return null;
}
public static org.apache.hadoop.yarn.server.api.records.NodeStatus
createNodeStatus(NodeId nodeId, List<ContainerStatus> containers) {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =

View File

@ -40,6 +40,8 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@ -122,6 +124,12 @@ public GetContainerStatusesResponse getContainerStatuses(
return GetContainerStatusesResponse.newInstance(null, null);
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(IncreaseContainersResourceRequest request)
throws YarnException {
return IncreaseContainersResourceResponse.newInstance(null, null);
}
public Credentials getContainerCredentials() throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -126,6 +128,13 @@ public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws YarnException {
return null;
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request)
throws YarnException {
return null;
}
}
@Test