YARN-9362. Code cleanup in TestNMLeveldbStateStoreService. Contributed by Denes Gerencser

This commit is contained in:
Szilard Nemeth 2019-11-26 21:44:22 +01:00
parent aa7ab2719f
commit 828ab400ee

View File

@ -329,8 +329,248 @@ public void testApplicationStorage() throws IOException {
assertEquals(appProto1, apps.get(0)); assertEquals(appProto1, apps.get(0));
} }
@Test
public void testContainerStorageWhenContainerIsRequested()
throws IOException {
final ContainerStateConstructParams containerParams =
storeContainerInStateStore();
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
final RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(containerParams.getContainerStartTime().longValue(),
rcs.getStartTime());
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerParams.getContainerRequest(), rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerParams.getContainerResource(), rcs.getCapability());
}
@Test
public void testContainerStorageWhenContainerIsQueued()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
StartContainerRequest containerReq = containerParams.getContainerRequest();
Resource containerResource = containerParams.getContainerResource();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerResource, rcs.getCapability());
}
@Test
public void testContainerStorageWhenContainerIsLaunched()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
StartContainerRequest containerReq = containerParams.getContainerRequest();
Resource containerResource = containerParams.getContainerResource();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
StringBuilder diags = launchContainerWithDiagnostics(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics());
assertEquals(containerResource, rcs.getCapability());
}
@Test
public void testContainerStorageWhenContainerIsPaused()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
StartContainerRequest containerReq = containerParams.getContainerRequest();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
stateStore.storeContainerPaused(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
// Resume the container
stateStore.removeContainerPaused(containerId);
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
}
@Test
public void testContainerStorageWhenContainerSizeIncreased()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
launchContainerWithDiagnostics(containerId);
increaseContainerSize(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
}
@Test
public void testContainerStorageWhenContainerMarkedAsKilled()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
StringBuilder diags = launchContainerWithDiagnostics(containerId);
ContainerTokenIdentifier updateTokenIdentifier =
increaseContainerSize(containerId);
markContainerAsKilled(containerId, diags);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertTrue(rcs.getKilled());
ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
.newContainerTokenIdentifier(rcs.getStartRequest()
.getContainerToken());
assertEquals(updateTokenIdentifier, tokenReadFromRequest);
assertEquals(diags.toString(), rcs.getDiagnostics());
}
@Test
public void testContainerStorageWhenContainerCompleted()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
StringBuilder diags = launchContainerWithDiagnostics(containerId);
markContainerAsKilled(containerId, diags);
// add yet more diags, mark container completed
diags.append("some final diags");
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerCompleted(containerId, 21);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
assertEquals(21, rcs.getExitCode());
assertTrue(rcs.getKilled());
assertEquals(diags.toString(), rcs.getDiagnostics());
}
@Test @Test
public void testContainerStorage() throws IOException { public void testContainerStorage() throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
// remaining retry attempts, work dir and log dir are stored
stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(6, rcs.getRemainingRetryAttempts());
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
validateRetryAttempts(containerId);
}
@Test
public void testContainerStorageWhenContainerRemoved()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// recover again to check remove clears all containers
restartStateStore();
NMStateStoreService nmStoreSpy = spy(stateStore);
loadContainersState(nmStoreSpy.getContainerStateIterator());
verify(nmStoreSpy, times(0)).removeContainer(any(ContainerId.class));
}
private ContainerStateConstructParams storeContainerInStateStore()
throws IOException {
// test empty when no state // test empty when no state
List<RecoveredContainerState> recoveredContainers = List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator()); loadContainersState(stateStore.getContainerStateIterator());
@ -345,162 +585,115 @@ public void testContainerStorage() throws IOException {
StartContainerRequest containerReq = StartContainerRequest containerReq =
createContainerRequest(containerId, containerResource); createContainerRequest(containerId, containerResource);
// store a container and verify recovered long anyContainerStartTime = 1573155078494L;
long containerStartTime = System.currentTimeMillis(); stateStore.storeContainer(containerId, 0, anyContainerStartTime,
stateStore.storeContainer(containerId, 0, containerStartTime, containerReq); containerReq);
// verify the container version key is not stored for new containers // verify the container version key is not stored for new containers
DB db = stateStore.getDB(); DB db = stateStore.getDB();
assertNull("version key present for new container", db.get(bytes( assertNull("version key present for new container", db.get(bytes(
stateStore.getContainerVersionKey(containerId.toString())))); stateStore.getContainerVersionKey(containerId.toString()))));
restartStateStore(); return new ContainerStateConstructParams()
recoveredContainers = .setContainerRequest(containerReq)
loadContainersState(stateStore.getContainerStateIterator()); .setContainerResource(containerResource)
assertEquals(1, recoveredContainers.size()); .setContainerStartTime(anyContainerStartTime)
RecoveredContainerState rcs = recoveredContainers.get(0); .setAppAttemptId(appAttemptId)
assertEquals(0, rcs.getVersion()); .setContainerId(containerId);
assertEquals(containerStartTime, rcs.getStartTime()); }
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerResource, rcs.getCapability());
// store a new container record without StartContainerRequest private static class ContainerStateConstructParams {
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); private StartContainerRequest containerRequest;
stateStore.storeContainerLaunched(containerId1); private Resource containerResource;
recoveredContainers = private Long containerStartTime;
loadContainersState(stateStore.getContainerStateIterator()); private ApplicationAttemptId appAttemptId;
// check whether the new container record is discarded private ContainerId containerId;
assertEquals(1, recoveredContainers.size());
// queue the container, and verify recovered public ApplicationAttemptId getAppAttemptId() {
stateStore.storeContainerQueued(containerId); return appAttemptId;
restartStateStore(); }
recoveredContainers = public ContainerStateConstructParams setAppAttemptId(ApplicationAttemptId
loadContainersState(stateStore.getContainerStateIterator()); theAppAttemptId) {
assertEquals(1, recoveredContainers.size()); this.appAttemptId = theAppAttemptId;
rcs = recoveredContainers.get(0); return this;
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); }
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); public ContainerId getContainerId() {
assertEquals(false, rcs.getKilled()); return containerId;
assertEquals(containerReq, rcs.getStartRequest()); }
assertTrue(rcs.getDiagnostics().isEmpty()); public ContainerStateConstructParams setContainerId(ContainerId
assertEquals(containerResource, rcs.getCapability()); theContainerId) {
this.containerId = theContainerId;
return this;
}
// launch the container, add some diagnostics, and verify recovered public StartContainerRequest getContainerRequest() {
StringBuilder diags = new StringBuilder(); return containerRequest;
stateStore.storeContainerLaunched(containerId); }
diags.append("some diags for container"); public ContainerStateConstructParams setContainerRequest(
StartContainerRequest theContainerRequest) {
this.containerRequest = theContainerRequest;
return this;
}
public Resource getContainerResource() {
return containerResource;
}
public ContainerStateConstructParams setContainerResource(
Resource theContainerResource) {
this.containerResource = theContainerResource;
return this;
}
public Long getContainerStartTime() {
return containerStartTime;
}
public ContainerStateConstructParams setContainerStartTime(
Long theContainerStartTime) {
this.containerStartTime = theContainerStartTime;
return this;
}
}
private void markContainerAsKilled(ContainerId containerId,
StringBuilder diags) throws IOException {
// mark the container killed, add some more diags
diags.append("some more diags for container");
stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerDiagnostics(containerId, diags);
restartStateStore(); stateStore.storeContainerKilled(containerId);
recoveredContainers = }
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics());
assertEquals(containerResource, rcs.getCapability());
// pause the container, and verify recovered private ContainerTokenIdentifier increaseContainerSize(
stateStore.storeContainerPaused(containerId); ContainerId containerId) throws IOException {
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
// Resume the container
stateStore.removeContainerPaused(containerId);
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
// increase the container size, and verify recovered
ContainerTokenIdentifier updateTokenIdentifier = ContainerTokenIdentifier updateTokenIdentifier =
new ContainerTokenIdentifier(containerId, "host", "user", new ContainerTokenIdentifier(containerId, "host", "user",
Resource.newInstance(2468, 4), 9876543210L, 42, 2468, Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
Priority.newInstance(7), 13579); Priority.newInstance(7), 13579);
stateStore stateStore
.storeContainerUpdateToken(containerId, updateTokenIdentifier); .storeContainerUpdateToken(containerId, updateTokenIdentifier);
restartStateStore(); return updateTokenIdentifier;
recoveredContainers = }
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
// mark the container killed, add some more diags, and verify recovered private StringBuilder launchContainerWithDiagnostics(ContainerId containerId)
diags.append("some more diags for container"); throws IOException {
StringBuilder diags = new StringBuilder();
stateStore.storeContainerLaunched(containerId);
diags.append("some diags for container");
stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerKilled(containerId); return diags;
restartStateStore(); }
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertTrue(rcs.getKilled());
ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
.newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken());
assertEquals(updateTokenIdentifier, tokenReadFromRequest);
assertEquals(diags.toString(), rcs.getDiagnostics());
// add yet more diags, mark container completed, and verify recovered private void storeNewContainerRecordWithoutStartContainerRequest(
diags.append("some final diags"); ApplicationAttemptId appAttemptId) throws IOException {
stateStore.storeContainerDiagnostics(containerId, diags); // store a new container record without StartContainerRequest
stateStore.storeContainerCompleted(containerId, 21); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
restartStateStore(); stateStore.storeContainerLaunched(containerId1);
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
assertEquals(21, rcs.getExitCode());
assertTrue(rcs.getKilled());
assertEquals(diags.toString(), rcs.getDiagnostics());
// store remainingRetryAttempts, workDir and logDir List<RecoveredContainerState> recoveredContainers =
stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator()); loadContainersState(stateStore.getContainerStateIterator());
// check whether the new container record is discarded
assertEquals(1, recoveredContainers.size()); assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(6, rcs.getRemainingRetryAttempts());
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
validateRetryAttempts(containerId);
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// recover again to check remove clears all containers
restartStateStore();
NMStateStoreService nmStoreSpy = spy(stateStore);
loadContainersState(nmStoreSpy.getContainerStateIterator());
verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class));
} }
private void validateRetryAttempts(ContainerId containerId) private void validateRetryAttempts(ContainerId containerId)
@ -524,11 +717,6 @@ private StartContainerRequest createContainerRequest(
return createContainerRequestInternal(containerId, res); return createContainerRequestInternal(containerId, res);
} }
private StartContainerRequest createContainerRequest(
ContainerId containerId) {
return createContainerRequestInternal(containerId, null);
}
private StartContainerRequest createContainerRequestInternal(ContainerId private StartContainerRequest createContainerRequestInternal(ContainerId
containerId, Resource res) { containerId, Resource res) {
LocalResource lrsrc = LocalResource.newInstance( LocalResource lrsrc = LocalResource.newInstance(
@ -545,9 +733,9 @@ private StartContainerRequest createContainerRequestInternal(ContainerId
containerCmds.add("somearg"); containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice", serviceData.put("someservice",
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
ByteBuffer containerTokens = ByteBuffer containerTokens =
ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
Map<ApplicationAccessType, String> acls = Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>(); new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
@ -676,7 +864,8 @@ public void testLocalTrackerStateIterator() throws IOException {
} }
@Test @Test
public void testStartResourceLocalization() throws IOException { public void testStartResourceLocalizationForApplicationResource()
throws IOException {
String user = "somebody"; String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationId appId = ApplicationId.newInstance(1, 1);
@ -730,10 +919,14 @@ public void testStartResourceLocalization() throws IOException {
assertEquals(1, startedResources.size()); assertEquals(1, startedResources.size());
assertEquals(appRsrcLocalPath, assertEquals(appRsrcLocalPath,
startedResources.get(appRsrcProto)); startedResources.get(appRsrcProto));
}
// start some public and private resources @Test
public void testStartResourceLocalizationForPublicResources()
throws IOException {
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(pubRsrcPath1), URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L); 789L, 135L);
@ -750,23 +943,14 @@ public void testStartResourceLocalization() throws IOException {
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2, stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2); pubRsrcLocalPath2);
Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
// restart and verify resources are marked in-progress // restart and verify resources are marked in-progress
restartStateStore(); restartStateStore();
state = stateStore.loadLocalizationState(); RecoveredLocalizationState state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState(); LocalResourceTrackerState pubts = state.getPublicTrackerState();
completedResources = loadCompletedResources( List<LocalizedResourceProto> completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator()); pubts.getCompletedResourcesIterator());
startedResources = loadStartedResources( Map<LocalResourceProto, Path> startedResources = loadStartedResources(
pubts.getStartedResourcesIterator()); pubts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty()); assertTrue(completedResources.isEmpty());
assertEquals(2, startedResources.size()); assertEquals(2, startedResources.size());
@ -774,34 +958,49 @@ public void testStartResourceLocalization() throws IOException {
startedResources.get(pubRsrcProto1)); startedResources.get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2, assertEquals(pubRsrcLocalPath2,
startedResources.get(pubRsrcProto2)); startedResources.get(pubRsrcProto2));
userResources = loadUserResources(state.getIterator()); Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(0, userResources.size());
}
@Test
public void testStartResourceLocalizationForPrivateResource()
throws IOException {
Path privRsrcPath = new Path("hdfs://some/private/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
String user = "somebody";
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
// restart and verify resources are marked in-progress
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(1, userResources.size()); assertEquals(1, userResources.size());
rur = userResources.get(user); RecoveredUserResources rur = userResources.get(user);
privts = rur.getPrivateTrackerState(); LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts); assertNotNull(privts);
completedResources = loadCompletedResources( List<LocalizedResourceProto> completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator()); privts.getCompletedResourcesIterator());
startedResources = loadStartedResources( Map<LocalResourceProto, Path> startedResources = loadStartedResources(
privts.getStartedResourcesIterator()); privts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty()); assertTrue(completedResources.isEmpty());
assertEquals(1, startedResources.size()); assertEquals(1, startedResources.size());
assertEquals(privRsrcLocalPath, assertEquals(privRsrcLocalPath,
startedResources.get(privRsrcProto)); startedResources.get(privRsrcProto));
assertEquals(1, rur.getAppTrackerStates().size()); assertEquals(0, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
completedResources = loadCompletedResources(
appts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
appts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(1, startedResources.size());
assertEquals(appRsrcLocalPath,
startedResources.get(appRsrcProto));
} }
@Test @Test
public void testFinishResourceLocalization() throws IOException { public void testFinishResourceLocalizationForApplicationResource()
throws IOException {
String user = "somebody"; String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationId appId = ApplicationId.newInstance(1, 1);
@ -862,10 +1061,14 @@ public void testFinishResourceLocalization() throws IOException {
assertEquals(1, completedResources.size()); assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto, assertEquals(appLocalizedProto,
completedResources.iterator().next()); completedResources.iterator().next());
}
// start some public and private resources @Test
public void testFinishResourceLocalizationForPublicResources()
throws IOException {
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(pubRsrcPath1), URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L); 789L, 135L);
@ -882,15 +1085,6 @@ public void testFinishResourceLocalization() throws IOException {
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2, stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2); pubRsrcLocalPath2);
Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
// finish some of the resources // finish some of the resources
LocalizedResourceProto pubLocalizedProto1 = LocalizedResourceProto pubLocalizedProto1 =
@ -900,6 +1094,43 @@ public void testFinishResourceLocalization() throws IOException {
.setSize(pubRsrcProto1.getSize()) .setSize(pubRsrcProto1.getSize())
.build(); .build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto1); stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
// restart and verify state
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
List<LocalizedResourceProto> completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertEquals(1, completedResources.size());
assertEquals(pubLocalizedProto1,
completedResources.iterator().next());
assertEquals(1, startedResources.size());
assertEquals(pubRsrcLocalPath2,
startedResources.get(pubRsrcProto2));
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(0, userResources.size());
}
@Test
public void testFinishResourceLocalizationForPrivateResource()
throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
Path privRsrcPath = new Path("hdfs://some/private/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
LocalizedResourceProto privLocalizedProto = LocalizedResourceProto privLocalizedProto =
LocalizedResourceProto.newBuilder() LocalizedResourceProto.newBuilder()
.setResource(privRsrcProto) .setResource(privRsrcProto)
@ -910,22 +1141,19 @@ public void testFinishResourceLocalization() throws IOException {
// restart and verify state // restart and verify state
restartStateStore(); restartStateStore();
state = stateStore.loadLocalizationState(); RecoveredLocalizationState state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState(); LocalResourceTrackerState pubts = state.getPublicTrackerState();
completedResources = loadCompletedResources( List<LocalizedResourceProto> completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator()); pubts.getCompletedResourcesIterator());
startedResources = loadStartedResources( Map<LocalResourceProto, Path> startedResources = loadStartedResources(
pubts.getStartedResourcesIterator()); pubts.getStartedResourcesIterator());
assertEquals(1, completedResources.size()); assertEquals(0, completedResources.size());
assertEquals(pubLocalizedProto1, assertEquals(0, startedResources.size());
completedResources.iterator().next()); Map<String, RecoveredUserResources> userResources =
assertEquals(1, startedResources.size()); loadUserResources(state.getIterator());
assertEquals(pubRsrcLocalPath2,
startedResources.get(pubRsrcProto2));
userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size()); assertEquals(1, userResources.size());
rur = userResources.get(user); RecoveredUserResources rur = userResources.get(user);
privts = rur.getPrivateTrackerState(); LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts); assertNotNull(privts);
completedResources = loadCompletedResources( completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator()); privts.getCompletedResourcesIterator());
@ -935,21 +1163,16 @@ public void testFinishResourceLocalization() throws IOException {
assertEquals(privLocalizedProto, assertEquals(privLocalizedProto,
completedResources.iterator().next()); completedResources.iterator().next());
assertTrue(startedResources.isEmpty()); assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size()); assertEquals(0, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId); LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts); assertNull(appts);
completedResources = loadCompletedResources(
appts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
appts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty()); assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size()); assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto,
completedResources.iterator().next());
} }
@Test @Test
public void testRemoveLocalizedResource() throws IOException { public void testRemoveLocalizedResourceForApplicationResource()
throws IOException {
String user = "somebody"; String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationId appId = ApplicationId.newInstance(1, 1);
@ -983,10 +1206,15 @@ public void testRemoveLocalizedResource() throws IOException {
restartStateStore(); restartStateStore();
verifyEmptyState(); verifyEmptyState();
}
// add public and private resources and remove some @Test
public void testRemoveLocalizedResourceForPublicResources()
throws IOException {
// add public resources and remove some
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(pubRsrcPath1), URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L); 789L, 135L);
@ -1018,8 +1246,32 @@ public void testRemoveLocalizedResource() throws IOException {
.build(); .build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto2); stateStore.finishResourceLocalization(null, null, pubLocalizedProto2);
stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2); stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2);
// restart and verify state
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
List<LocalizedResourceProto> completedResources =
loadCompletedResources(pubts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources =
loadStartedResources(pubts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(pubLocalizedProto1,
completedResources.iterator().next());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty());
}
@Test
public void testRemoveLocalizedResourceForPrivateResource()
throws IOException {
String user = "somebody";
Path privRsrcPath = new Path("hdfs://some/private/resource"); Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(privRsrcPath), URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*"); 789L, 680L, "*pattern*");
@ -1038,9 +1290,7 @@ public void testRemoveLocalizedResource() throws IOException {
Map<LocalResourceProto, Path> startedResources = Map<LocalResourceProto, Path> startedResources =
loadStartedResources(pubts.getStartedResourcesIterator()); loadStartedResources(pubts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty()); assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size()); assertEquals(0, completedResources.size());
assertEquals(pubLocalizedProto1,
completedResources.iterator().next());
Map<String, RecoveredUserResources> userResources = Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator()); loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty()); assertTrue(userResources.isEmpty());
@ -1574,9 +1824,9 @@ private StartContainerRequest storeMockContainer(ContainerId containerId)
containerCmds.add("somearg"); containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice", serviceData.put("someservice",
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
ByteBuffer containerTokens = ByteBuffer ByteBuffer containerTokens = ByteBuffer
.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); .wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
Map<ApplicationAccessType, String> acls = Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>(); new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); acls.put(ApplicationAccessType.VIEW_APP, "viewuser");