YARN-9067. YARN Resource Manager is running OOM because of leak of Configuration Object. Contributed by Eric Yang.
This commit is contained in:
parent
fe7dab8ef5
commit
efc4d91cbe
@ -118,10 +118,13 @@ public class ApiServer {
|
||||
@Override
|
||||
public Void run() throws YarnException, IOException {
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
sc.actionBuild(service);
|
||||
sc.close();
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
sc.actionBuild(service);
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
@ -133,11 +136,14 @@ public class ApiServer {
|
||||
@Override
|
||||
public ApplicationId run() throws IOException, YarnException {
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
ApplicationId applicationId = sc.actionCreate(service);
|
||||
sc.close();
|
||||
return applicationId;
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
ApplicationId applicationId = sc.actionCreate(service);
|
||||
return applicationId;
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
serviceStatus.setDiagnostics("Application ID: " + applicationId);
|
||||
@ -245,29 +251,32 @@ public class ApiServer {
|
||||
public Integer run() throws Exception {
|
||||
int result = 0;
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
Exception stopException = null;
|
||||
try {
|
||||
result = sc.actionStop(appName, destroy);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
LOG.info("Successfully stopped service {}", appName);
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
Exception stopException = null;
|
||||
try {
|
||||
result = sc.actionStop(appName, destroy);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
LOG.info("Successfully stopped service {}", appName);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.info("Got exception stopping service", e);
|
||||
stopException = e;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.info("Got exception stopping service", e);
|
||||
stopException = e;
|
||||
if (destroy) {
|
||||
result = sc.actionDestroy(appName);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
LOG.info("Successfully deleted service {}", appName);
|
||||
}
|
||||
} else {
|
||||
if (stopException != null) {
|
||||
throw stopException;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
if (destroy) {
|
||||
result = sc.actionDestroy(appName);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
LOG.info("Successfully deleted service {}", appName);
|
||||
}
|
||||
} else {
|
||||
if (stopException != null) {
|
||||
throw stopException;
|
||||
}
|
||||
}
|
||||
sc.close();
|
||||
return result;
|
||||
}
|
||||
});
|
||||
@ -378,13 +387,16 @@ public class ApiServer {
|
||||
@Override
|
||||
public Map<String, Long> run() throws YarnException, IOException {
|
||||
ServiceClient sc = new ServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
Map<String, Long> original = sc.flexByRestService(appName,
|
||||
Collections.singletonMap(componentName,
|
||||
component.getNumberOfContainers()));
|
||||
sc.close();
|
||||
return original;
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
Map<String, Long> original = sc.flexByRestService(appName,
|
||||
Collections.singletonMap(componentName,
|
||||
component.getNumberOfContainers()));
|
||||
return original;
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
ServiceStatus status = new ServiceStatus();
|
||||
@ -632,12 +644,15 @@ public class ApiServer {
|
||||
public Integer run() throws YarnException, IOException {
|
||||
int result = 0;
|
||||
ServiceClient sc = new ServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result = sc
|
||||
.actionFlex(appName, componentCountStrings);
|
||||
sc.close();
|
||||
return Integer.valueOf(result);
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result = sc
|
||||
.actionFlex(appName, componentCountStrings);
|
||||
return Integer.valueOf(result);
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
if (result == EXIT_SUCCESS) {
|
||||
@ -658,12 +673,15 @@ public class ApiServer {
|
||||
@Override
|
||||
public String run() throws YarnException, IOException {
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
String newLifeTime = sc.updateLifetime(appName,
|
||||
updateAppData.getLifetime());
|
||||
sc.close();
|
||||
return newLifeTime;
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
String newLifeTime = sc.updateLifetime(appName,
|
||||
updateAppData.getLifetime());
|
||||
return newLifeTime;
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
ServiceStatus status = new ServiceStatus();
|
||||
@ -681,11 +699,14 @@ public class ApiServer {
|
||||
@Override public ApplicationId run()
|
||||
throws YarnException, IOException {
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
ApplicationId appId = sc.actionStartAndGetId(appName);
|
||||
sc.close();
|
||||
return appId;
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
ApplicationId appId = sc.actionStartAndGetId(appName);
|
||||
return appId;
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
LOG.info("Successfully started service " + appName);
|
||||
@ -702,14 +723,17 @@ public class ApiServer {
|
||||
ServiceStatus status = new ServiceStatus();
|
||||
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
|
||||
sc.actionUpgradeExpress(service);
|
||||
} else {
|
||||
sc.initiateUpgrade(service);
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
|
||||
sc.actionUpgradeExpress(service);
|
||||
} else {
|
||||
sc.initiateUpgrade(service);
|
||||
}
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
sc.close();
|
||||
return null;
|
||||
});
|
||||
LOG.info("Service {} version {} upgrade initialized", service.getName(),
|
||||
@ -724,11 +748,14 @@ public class ApiServer {
|
||||
final UserGroupInformation ugi) throws IOException, InterruptedException {
|
||||
int result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
int exitCode = sc.actionCancelUpgrade(serviceName);
|
||||
sc.close();
|
||||
return exitCode;
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
int exitCode = sc.actionCancelUpgrade(serviceName);
|
||||
return exitCode;
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
});
|
||||
if (result == EXIT_SUCCESS) {
|
||||
ServiceStatus status = new ServiceStatus();
|
||||
@ -793,10 +820,13 @@ public class ApiServer {
|
||||
return ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
|
||||
int result1;
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result1 = sc.actionUpgrade(service, containers);
|
||||
sc.close();
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result1 = sc.actionUpgrade(service, containers);
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
return result1;
|
||||
});
|
||||
}
|
||||
@ -815,12 +845,15 @@ public class ApiServer {
|
||||
public Integer run() throws YarnException, IOException {
|
||||
int result = 0;
|
||||
ServiceClient sc = new ServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result = sc
|
||||
.actionDecommissionInstances(appName, instances);
|
||||
sc.close();
|
||||
return Integer.valueOf(result);
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result = sc
|
||||
.actionDecommissionInstances(appName, instances);
|
||||
return Integer.valueOf(result);
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
if (result == EXIT_SUCCESS) {
|
||||
@ -840,11 +873,14 @@ public class ApiServer {
|
||||
|
||||
return ugi.doAs((PrivilegedExceptionAction<Service>) () -> {
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
Service app1 = sc.getStatus(serviceName);
|
||||
sc.close();
|
||||
return app1;
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
Service app1 = sc.getStatus(serviceName);
|
||||
return app1;
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -855,12 +891,15 @@ public class ApiServer {
|
||||
return ugi.doAs((PrivilegedExceptionAction<ComponentContainers[]>) () -> {
|
||||
ComponentContainers[] result;
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result = sc.getContainers(serviceName, componentNames, version,
|
||||
containerStates);
|
||||
sc.close();
|
||||
return result;
|
||||
try {
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result = sc.getContainers(serviceName, componentNames, version,
|
||||
containerStates);
|
||||
return result;
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -75,7 +75,7 @@ public class ServiceClientTest extends ServiceClient {
|
||||
|
||||
public void forceStop() {
|
||||
expectedInstances.clear();
|
||||
super.stop();
|
||||
stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -163,6 +163,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
||||
if (registryClient != null) {
|
||||
registryClient.stop();
|
||||
}
|
||||
fs.getFileSystem().close();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user