Merge branch 'trunk' into HDFS-6581
This commit is contained in:
commit
900f6e52ec
@ -377,6 +377,9 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2529. Generic history service RPC interface doesn't work when service
|
||||
authorization is enabled. (Zhijie Shen via jianhe)
|
||||
|
||||
YARN-2558. Updated ContainerTokenIdentifier#read/write to use
|
||||
ContainerId#getContainerId. (Tsuyoshi OZAWA via jianhe)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -128,7 +128,7 @@ public void write(DataOutput out) throws IOException {
|
||||
out.writeLong(applicationId.getClusterTimestamp());
|
||||
out.writeInt(applicationId.getId());
|
||||
out.writeInt(applicationAttemptId.getAttemptId());
|
||||
out.writeInt(this.containerId.getId());
|
||||
out.writeLong(this.containerId.getContainerId());
|
||||
out.writeUTF(this.nmHostAddr);
|
||||
out.writeUTF(this.appSubmitter);
|
||||
out.writeInt(this.resource.getMemory());
|
||||
@ -147,7 +147,7 @@ public void readFields(DataInput in) throws IOException {
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
ApplicationAttemptId.newInstance(applicationId, in.readInt());
|
||||
this.containerId =
|
||||
ContainerId.newInstance(applicationAttemptId, in.readInt());
|
||||
ContainerId.newInstance(applicationAttemptId, in.readLong());
|
||||
this.nmHostAddr = in.readUTF();
|
||||
this.appSubmitter = in.readUTF();
|
||||
int memory = in.readInt();
|
||||
|
@ -28,6 +28,9 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.LinkedList;
|
||||
import com.google.common.io.ByteArrayDataInput;
|
||||
import com.google.common.io.ByteStreams;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -159,6 +162,25 @@ public void testContainerManager() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 500000)
|
||||
public void testContainerManagerWithEpoch() throws Exception {
|
||||
try {
|
||||
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
|
||||
.getName(), 1, 1, 1);
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
|
||||
// Testing for container token tampering
|
||||
testContainerTokenWithEpoch(conf);
|
||||
|
||||
} finally {
|
||||
if (yarnCluster != null) {
|
||||
yarnCluster.stop();
|
||||
yarnCluster = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testNMTokens(Configuration conf) throws Exception {
|
||||
NMTokenSecretManagerInRM nmTokenSecretManagerRM =
|
||||
yarnCluster.getResourceManager().getRMContext()
|
||||
@ -603,4 +625,74 @@ private void testContainerToken(Configuration conf) throws IOException,
|
||||
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
|
||||
containerToken, nmToken, true).contains(sb.toString()));
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests whether a containerId is serialized/deserialized with epoch.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws YarnException
|
||||
*/
|
||||
private void testContainerTokenWithEpoch(Configuration conf)
|
||||
throws IOException, InterruptedException, YarnException {
|
||||
|
||||
LOG.info("Running test for serializing/deserializing containerIds");
|
||||
|
||||
NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
|
||||
yarnCluster.getResourceManager().getRMContext()
|
||||
.getNMTokenSecretManager();
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 0);
|
||||
ContainerId cId = ContainerId.newInstance(appAttemptId, (5L << 40) | 3L);
|
||||
NodeManager nm = yarnCluster.getNodeManager(0);
|
||||
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
|
||||
nm.getNMContext().getNMTokenSecretManager();
|
||||
String user = "test";
|
||||
|
||||
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
|
||||
|
||||
NodeId nodeId = nm.getNMContext().getNodeId();
|
||||
|
||||
// Both id should be equal.
|
||||
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
|
||||
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
|
||||
|
||||
// Creating a normal Container Token
|
||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||
yarnCluster.getResourceManager().getRMContext().
|
||||
getContainerTokenSecretManager();
|
||||
Resource r = Resource.newInstance(1230, 2);
|
||||
Token containerToken =
|
||||
containerTokenSecretManager.createContainerToken(cId, nodeId, user, r,
|
||||
Priority.newInstance(0), 0);
|
||||
|
||||
ByteArrayDataInput input = ByteStreams.newDataInput(
|
||||
containerToken.getIdentifier().array());
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier();
|
||||
containerTokenIdentifier.readFields(input);
|
||||
Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
|
||||
Assert.assertEquals(
|
||||
cId.toString(), containerTokenIdentifier.getContainerID().toString());
|
||||
|
||||
Token nmToken =
|
||||
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
|
||||
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
testStartContainer(rpc, appAttemptId, nodeId, containerToken, nmToken,
|
||||
false);
|
||||
|
||||
List<ContainerId> containerIds = new LinkedList<ContainerId>();
|
||||
containerIds.add(cId);
|
||||
ContainerManagementProtocol proxy
|
||||
= getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
|
||||
GetContainerStatusesResponse res = proxy.getContainerStatuses(
|
||||
GetContainerStatusesRequest.newInstance(containerIds));
|
||||
Assert.assertNotNull(res.getContainerStatuses().get(0));
|
||||
Assert.assertEquals(
|
||||
cId, res.getContainerStatuses().get(0).getContainerId());
|
||||
Assert.assertEquals(cId.toString(),
|
||||
res.getContainerStatuses().get(0).getContainerId().toString());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user