YARN-6966. NodeManager metrics may return wrong negative values when NM restart. (Szilard Nemeth via Haibo Chen)
This commit is contained in:
parent
3a9e25edf5
commit
9d3c39e9dd
@ -496,7 +496,7 @@ protected void recoverActiveContainer(Application app,
|
||||
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||
launchContext, credentials, metrics, token, context, rcs);
|
||||
context.getContainers().put(token.getContainerID(), container);
|
||||
containerScheduler.recoverActiveContainer(container, rcs.getStatus());
|
||||
containerScheduler.recoverActiveContainer(container, rcs);
|
||||
app.handle(new ApplicationContainerInitEvent(container));
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,9 @@
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
|
||||
.RecoveredContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -259,11 +262,11 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
|
||||
* @param rcs Recovered Container status
|
||||
*/
|
||||
public void recoverActiveContainer(Container container,
|
||||
RecoveredContainerStatus rcs) {
|
||||
RecoveredContainerState rcs) {
|
||||
ExecutionType execType =
|
||||
container.getContainerTokenIdentifier().getExecutionType();
|
||||
if (rcs == RecoveredContainerStatus.QUEUED
|
||||
|| rcs == RecoveredContainerStatus.PAUSED) {
|
||||
if (rcs.getStatus() == RecoveredContainerStatus.QUEUED
|
||||
|| rcs.getStatus() == RecoveredContainerStatus.PAUSED) {
|
||||
if (execType == ExecutionType.GUARANTEED) {
|
||||
queuedGuaranteedContainers.put(container.getContainerId(), container);
|
||||
} else if (execType == ExecutionType.OPPORTUNISTIC) {
|
||||
@ -274,10 +277,15 @@ public void recoverActiveContainer(Container container,
|
||||
"UnKnown execution type received " + container.getContainerId()
|
||||
+ ", execType " + execType);
|
||||
}
|
||||
} else if (rcs == RecoveredContainerStatus.LAUNCHED) {
|
||||
} else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
|
||||
runningContainers.put(container.getContainerId(), container);
|
||||
utilizationTracker.addContainerResources(container);
|
||||
}
|
||||
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED
|
||||
&& rcs.getCapability() != null) {
|
||||
metrics.launchedContainer();
|
||||
metrics.allocateContainer(rcs.getCapability());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -52,6 +52,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.fusesource.leveldbjni.JniDBFactory;
|
||||
@ -237,7 +238,7 @@ public List<RecoveredContainerState> loadContainersState()
|
||||
iter.seek(bytes(CONTAINERS_KEY_PREFIX));
|
||||
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[],byte[]> entry = iter.peekNext();
|
||||
Entry<byte[], byte[]> entry = iter.peekNext();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
|
||||
break;
|
||||
@ -299,6 +300,10 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
|
||||
if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
|
||||
rcs.startRequest = new StartContainerRequestPBImpl(
|
||||
StartContainerRequestProto.parseFrom(entry.getValue()));
|
||||
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
||||
.newContainerTokenIdentifier(rcs.startRequest.getContainerToken());
|
||||
rcs.capability = new ResourcePBImpl(
|
||||
containerTokenIdentifier.getProto().getResource());
|
||||
} else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) {
|
||||
rcs.version = Integer.parseInt(asString(entry.getValue()));
|
||||
} else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) {
|
||||
@ -382,24 +387,25 @@ public void storeContainer(ContainerId containerId, int containerVersion,
|
||||
LOG.debug("storeContainer: containerId= " + idStr
|
||||
+ ", startRequest= " + startRequest);
|
||||
}
|
||||
String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
|
||||
String keyVersion = getContainerVersionKey(idStr);
|
||||
String keyStartTime =
|
||||
final String keyVersion = getContainerVersionKey(idStr);
|
||||
final String keyRequest =
|
||||
getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
|
||||
final StartContainerRequestProto startContainerRequest =
|
||||
((StartContainerRequestPBImpl) startRequest).getProto();
|
||||
|
||||
final String keyStartTime =
|
||||
getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX);
|
||||
final String startTimeValue = Long.toString(startTime);
|
||||
|
||||
try {
|
||||
WriteBatch batch = db.createWriteBatch();
|
||||
try {
|
||||
batch.put(bytes(keyRequest),
|
||||
((StartContainerRequestPBImpl) startRequest).getProto().
|
||||
toByteArray());
|
||||
batch.put(bytes(keyStartTime), bytes(Long.toString(startTime)));
|
||||
try (WriteBatch batch = db.createWriteBatch()) {
|
||||
batch.put(bytes(keyRequest), startContainerRequest.toByteArray());
|
||||
batch.put(bytes(keyStartTime), bytes(startTimeValue));
|
||||
if (containerVersion != 0) {
|
||||
batch.put(bytes(keyVersion),
|
||||
bytes(Integer.toString(containerVersion)));
|
||||
}
|
||||
db.write(batch);
|
||||
} finally {
|
||||
batch.close();
|
||||
}
|
||||
} catch (DBException e) {
|
||||
markStoreUnHealthy(e);
|
||||
|
@ -73,7 +73,7 @@ public List<RecoveredContainerState> loadContainersState()
|
||||
|
||||
@Override
|
||||
public void storeContainer(ContainerId containerId, int version,
|
||||
long startTime, StartContainerRequest startRequest) throws IOException {
|
||||
long startTime, StartContainerRequest startRequest) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -416,7 +416,8 @@ public abstract List<RecoveredContainerState> loadContainersState()
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void storeContainer(ContainerId containerId,
|
||||
int containerVersion, long startTime, StartContainerRequest startRequest)
|
||||
int containerVersion, long startTime,
|
||||
StartContainerRequest startRequest)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -107,7 +107,7 @@ public abstract class BaseContainerManagerTest {
|
||||
protected static File remoteLogDir;
|
||||
protected static File tmpDir;
|
||||
|
||||
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||
protected NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||
|
||||
public BaseContainerManagerTest() throws UnsupportedFileSystemException {
|
||||
localFS = FileContext.getLocalFSFileContext();
|
||||
|
@ -47,6 +47,7 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.net.ServerSocketUtil;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -106,6 +107,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.TestNodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
@ -400,6 +402,61 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure()
|
||||
cm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeManagerMetricsRecovery() throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
|
||||
|
||||
NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||
stateStore.init(conf);
|
||||
stateStore.start();
|
||||
Context context = createContext(conf, stateStore);
|
||||
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
||||
cm.init(conf);
|
||||
cm.start();
|
||||
metrics.addResource(Resource.newInstance(10240, 8));
|
||||
|
||||
// add an application by starting a container
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
||||
Map<String, String> containerEnv = Collections.emptyMap();
|
||||
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
|
||||
Map<String, LocalResource> localResources = Collections.emptyMap();
|
||||
List<String> commands = Arrays.asList("sleep 60s".split(" "));
|
||||
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||
localResources, containerEnv, commands, serviceData,
|
||||
null, null);
|
||||
StartContainersResponse startResponse = startContainer(context, cm, cid,
|
||||
clc, null, ContainerType.TASK);
|
||||
assertTrue(startResponse.getFailedRequests().isEmpty());
|
||||
assertEquals(1, context.getApplications().size());
|
||||
Application app = context.getApplications().get(appId);
|
||||
assertNotNull(app);
|
||||
|
||||
// make sure the container reaches RUNNING state
|
||||
waitForNMContainerState(cm, cid,
|
||||
org.apache.hadoop.yarn.server.nodemanager
|
||||
.containermanager.container.ContainerState.RUNNING);
|
||||
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
|
||||
|
||||
// restart and verify metrics could be recovered
|
||||
cm.stop();
|
||||
DefaultMetricsSystem.shutdown();
|
||||
metrics = NodeManagerMetrics.create();
|
||||
metrics.addResource(Resource.newInstance(10240, 8));
|
||||
TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 8);
|
||||
context = createContext(conf, stateStore);
|
||||
cm = createContainerManager(context, delSrvc);
|
||||
cm.init(conf);
|
||||
cm.start();
|
||||
assertEquals(1, context.getApplications().size());
|
||||
app = context.getApplications().get(appId);
|
||||
assertNotNull(app);
|
||||
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
|
||||
cm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerResizeRecovery() throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
@ -31,6 +32,8 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
|
||||
.RecoveredContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -71,6 +74,13 @@ public class TestContainerSchedulerRecovery {
|
||||
|
||||
private ContainerScheduler spy;
|
||||
|
||||
private RecoveredContainerState createRecoveredContainerState(
|
||||
RecoveredContainerStatus status) {
|
||||
RecoveredContainerState mockState = mock(RecoveredContainerState.class);
|
||||
when(mockState.getStatus()).thenReturn(status);
|
||||
return mockState;
|
||||
}
|
||||
|
||||
@Before public void setUp() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
spy = spy(tempContainerScheduler);
|
||||
@ -94,7 +104,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -113,7 +124,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -132,7 +144,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -151,7 +164,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -170,7 +184,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -189,7 +204,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -208,7 +224,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.REQUESTED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -227,7 +244,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.REQUESTED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -246,7 +264,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.COMPLETED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -265,7 +284,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.COMPLETED);
|
||||
when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
@ -284,7 +304,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
@ -302,7 +323,8 @@ public class TestContainerSchedulerRecovery {
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
|
||||
assertEquals(0, spy.getNumRunningContainers());
|
||||
RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
|
||||
RecoveredContainerState rcs =
|
||||
createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
|
||||
when(container.getContainerTokenIdentifier()).thenReturn(token);
|
||||
spy.recoverActiveContainer(container, rcs);
|
||||
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
|
||||
|
@ -113,8 +113,8 @@ public void testReferenceOfSingletonJvmMetrics() {
|
||||
assertGauge("AvailableVCores", 19, rb);
|
||||
}
|
||||
|
||||
private void checkMetrics(int launched, int completed, int failed, int killed,
|
||||
int initing, int running, int allocatedGB,
|
||||
public static void checkMetrics(int launched, int completed, int failed,
|
||||
int killed, int initing, int running, int allocatedGB,
|
||||
int allocatedContainers, int availableGB, int allocatedVCores,
|
||||
int availableVCores) {
|
||||
MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||
@ -45,6 +46,9 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
||||
public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
|
||||
private Map<ContainerId, RecoveredContainerState> containerStates;
|
||||
@ -132,11 +136,19 @@ public synchronized List<RecoveredContainerState> loadContainersState()
|
||||
|
||||
@Override
|
||||
public synchronized void storeContainer(ContainerId containerId,
|
||||
int version, long startTime, StartContainerRequest startRequest)
|
||||
throws IOException {
|
||||
int version, long startTime, StartContainerRequest startRequest) {
|
||||
RecoveredContainerState rcs = new RecoveredContainerState();
|
||||
rcs.startRequest = startRequest;
|
||||
rcs.version = version;
|
||||
try {
|
||||
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
||||
.newContainerTokenIdentifier(startRequest.getContainerToken());
|
||||
rcs.capability =
|
||||
new ResourcePBImpl(containerTokenIdentifier.getProto().getResource());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
rcs.setStartTime(startTime);
|
||||
containerStates.put(containerId, rcs);
|
||||
}
|
||||
|
@ -238,7 +238,9 @@ public void testContainerStorage() throws IOException {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 4);
|
||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
||||
StartContainerRequest containerReq = createContainerRequest(containerId);
|
||||
Resource containerResource = Resource.newInstance(1024, 2);
|
||||
StartContainerRequest containerReq =
|
||||
createContainerRequest(containerId, containerResource);
|
||||
|
||||
// store a container and verify recovered
|
||||
long containerStartTime = System.currentTimeMillis();
|
||||
@ -260,6 +262,7 @@ public void testContainerStorage() throws IOException {
|
||||
assertEquals(false, rcs.getKilled());
|
||||
assertEquals(containerReq, rcs.getStartRequest());
|
||||
assertTrue(rcs.getDiagnostics().isEmpty());
|
||||
assertEquals(containerResource, rcs.getCapability());
|
||||
|
||||
// store a new container record without StartContainerRequest
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
|
||||
@ -279,6 +282,7 @@ public void testContainerStorage() throws IOException {
|
||||
assertEquals(false, rcs.getKilled());
|
||||
assertEquals(containerReq, rcs.getStartRequest());
|
||||
assertTrue(rcs.getDiagnostics().isEmpty());
|
||||
assertEquals(containerResource, rcs.getCapability());
|
||||
|
||||
// launch the container, add some diagnostics, and verify recovered
|
||||
StringBuilder diags = new StringBuilder();
|
||||
@ -294,6 +298,7 @@ public void testContainerStorage() throws IOException {
|
||||
assertEquals(false, rcs.getKilled());
|
||||
assertEquals(containerReq, rcs.getStartRequest());
|
||||
assertEquals(diags.toString(), rcs.getDiagnostics());
|
||||
assertEquals(containerResource, rcs.getCapability());
|
||||
|
||||
// pause the container, and verify recovered
|
||||
stateStore.storeContainerPaused(containerId);
|
||||
@ -394,8 +399,18 @@ private void validateRetryAttempts(ContainerId containerId)
|
||||
assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2));
|
||||
}
|
||||
|
||||
private StartContainerRequest createContainerRequest(
|
||||
ContainerId containerId, Resource res) {
|
||||
return createContainerRequestInternal(containerId, res);
|
||||
}
|
||||
|
||||
private StartContainerRequest createContainerRequest(
|
||||
ContainerId containerId) {
|
||||
return createContainerRequestInternal(containerId, null);
|
||||
}
|
||||
|
||||
private StartContainerRequest createContainerRequestInternal(ContainerId
|
||||
containerId, Resource res) {
|
||||
LocalResource lrsrc = LocalResource.newInstance(
|
||||
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
|
||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
|
||||
@ -421,6 +436,10 @@ private StartContainerRequest createContainerRequest(
|
||||
localResources, env, containerCmds, serviceData, containerTokens,
|
||||
acls);
|
||||
Resource containerRsrc = Resource.newInstance(1357, 3);
|
||||
|
||||
if (res != null) {
|
||||
containerRsrc = res;
|
||||
}
|
||||
ContainerTokenIdentifier containerTokenId =
|
||||
new ContainerTokenIdentifier(containerId, "host", "user",
|
||||
containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
|
||||
|
Loading…
Reference in New Issue
Block a user