YARN-2558. Updated ContainerTokenIdentifier#read/write to use ContainerId#getContainerId. Contributed by Tsuyoshi OZAWA.

This commit is contained in:
Jian He 2014-09-17 15:12:17 -07:00
parent f230248525
commit f4886111aa
3 changed files with 97 additions and 2 deletions

View File

@ -377,6 +377,9 @@ Release 2.6.0 - UNRELEASED
YARN-2529. Generic history service RPC interface doesn't work when service
authorization is enabled. (Zhijie Shen via jianhe)
YARN-2558. Updated ContainerTokenIdentifier#read/write to use
ContainerId#getContainerId. (Tsuyoshi OZAWA via jianhe)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -128,7 +128,7 @@ public void write(DataOutput out) throws IOException {
out.writeLong(applicationId.getClusterTimestamp());
out.writeInt(applicationId.getId());
out.writeInt(applicationAttemptId.getAttemptId());
out.writeInt(this.containerId.getId());
out.writeLong(this.containerId.getContainerId());
out.writeUTF(this.nmHostAddr);
out.writeUTF(this.appSubmitter);
out.writeInt(this.resource.getMemory());
@ -147,7 +147,7 @@ public void readFields(DataInput in) throws IOException {
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, in.readInt());
this.containerId =
ContainerId.newInstance(applicationAttemptId, in.readInt());
ContainerId.newInstance(applicationAttemptId, in.readLong());
this.nmHostAddr = in.readUTF();
this.appSubmitter = in.readUTF();
int memory = in.readInt();

View File

@ -28,6 +28,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.LinkedList;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteStreams;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -158,6 +161,25 @@ public void testContainerManager() throws Exception {
}
}
}
@Test (timeout = 500000)
public void testContainerManagerWithEpoch() throws Exception {
try {
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
.getName(), 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
// Testing for container token tampering
testContainerTokenWithEpoch(conf);
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
yarnCluster = null;
}
}
}
private void testNMTokens(Configuration conf) throws Exception {
NMTokenSecretManagerInRM nmTokenSecretManagerRM =
@ -603,4 +625,74 @@ private void testContainerToken(Configuration conf) throws IOException,
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
containerToken, nmToken, true).contains(sb.toString()));
}
/**
* This tests whether a containerId is serialized/deserialized with epoch.
*
* @throws IOException
* @throws InterruptedException
* @throws YarnException
*/
private void testContainerTokenWithEpoch(Configuration conf)
throws IOException, InterruptedException, YarnException {
LOG.info("Running test for serializing/deserializing containerIds");
NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
yarnCluster.getResourceManager().getRMContext()
.getNMTokenSecretManager();
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newInstance(appAttemptId, (5L << 40) | 3L);
NodeManager nm = yarnCluster.getNodeManager(0);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
nm.getNMContext().getNMTokenSecretManager();
String user = "test";
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
NodeId nodeId = nm.getNMContext().getNodeId();
// Both id should be equal.
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
// Creating a normal Container Token
RMContainerTokenSecretManager containerTokenSecretManager =
yarnCluster.getResourceManager().getRMContext().
getContainerTokenSecretManager();
Resource r = Resource.newInstance(1230, 2);
Token containerToken =
containerTokenSecretManager.createContainerToken(cId, nodeId, user, r,
Priority.newInstance(0), 0);
ByteArrayDataInput input = ByteStreams.newDataInput(
containerToken.getIdentifier().array());
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier();
containerTokenIdentifier.readFields(input);
Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
Assert.assertEquals(
cId.toString(), containerTokenIdentifier.getContainerID().toString());
Token nmToken =
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
YarnRPC rpc = YarnRPC.create(conf);
testStartContainer(rpc, appAttemptId, nodeId, containerToken, nmToken,
false);
List<ContainerId> containerIds = new LinkedList<ContainerId>();
containerIds.add(cId);
ContainerManagementProtocol proxy
= getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
GetContainerStatusesResponse res = proxy.getContainerStatuses(
GetContainerStatusesRequest.newInstance(containerIds));
Assert.assertNotNull(res.getContainerStatuses().get(0));
Assert.assertEquals(
cId, res.getContainerStatuses().get(0).getContainerId());
Assert.assertEquals(cId.toString(),
res.getContainerStatuses().get(0).getContainerId().toString());
}
}