YARN-9521. Handle FileSystem close in ApiServiceClient
Contributed by kyungwan nam. Reviewed by Eric Yang.
This commit is contained in:
parent
0ddb5f0881
commit
fe7d67a8a2
@ -24,6 +24,7 @@
|
|||||||
import java.text.MessageFormat;
|
import java.text.MessageFormat;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
import javax.ws.rs.core.HttpHeaders;
|
import javax.ws.rs.core.HttpHeaders;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
@ -657,13 +658,26 @@ public int actionUpgradeComponents(String appName, List<String> components)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int actionCleanUp(String appName, String userName) throws
|
public int actionCleanUp(String appName, String userName) throws
|
||||||
IOException, YarnException {
|
IOException, YarnException, InterruptedException {
|
||||||
ServiceClient sc = new ServiceClient();
|
UserGroupInformation proxyUser;
|
||||||
sc.init(getConfig());
|
UserGroupInformation ugi;
|
||||||
sc.start();
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
int result = sc.actionCleanUp(appName, userName);
|
proxyUser = UserGroupInformation.getLoginUser();
|
||||||
sc.close();
|
ugi = UserGroupInformation.createProxyUser(userName, proxyUser);
|
||||||
return result;
|
} else {
|
||||||
|
ugi = UserGroupInformation.createRemoteUser(userName);
|
||||||
|
}
|
||||||
|
return ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
|
||||||
|
ServiceClient sc = new ServiceClient();
|
||||||
|
try {
|
||||||
|
sc.init(getConfig());
|
||||||
|
sc.start();
|
||||||
|
int result = sc.actionCleanUp(appName, userName);
|
||||||
|
return result;
|
||||||
|
} finally {
|
||||||
|
sc.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -18,6 +18,10 @@
|
|||||||
package org.apache.hadoop.yarn.service.client;
|
package org.apache.hadoop.yarn.service.client;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
@ -40,6 +44,8 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test class for system service manager.
|
* Test class for system service manager.
|
||||||
*/
|
*/
|
||||||
@ -207,4 +213,53 @@ private void verifyForLaunchedUserServices() {
|
|||||||
Assert.assertTrue(services.containsAll(serviceSet));
|
Assert.assertTrue(services.containsAll(serviceSet));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileSystemCloseWhenCleanUpService() throws Exception {
|
||||||
|
FileSystem fs = null;
|
||||||
|
Path path = new Path("/tmp/servicedir");
|
||||||
|
|
||||||
|
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
|
||||||
|
.numDataNodes(1).build();
|
||||||
|
|
||||||
|
fs = hdfsCluster.getFileSystem();
|
||||||
|
if (!fs.exists(path)) {
|
||||||
|
fs.mkdirs(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
SystemServiceManagerImpl serviceManager = new SystemServiceManagerImpl();
|
||||||
|
|
||||||
|
hdfsConfig.set(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY,
|
||||||
|
path.toString());
|
||||||
|
serviceManager.init(hdfsConfig);
|
||||||
|
|
||||||
|
// the FileSystem object owned by SystemServiceManager must not be closed
|
||||||
|
// when cleanup a service
|
||||||
|
hdfsConfig.set("hadoop.registry.zk.connection.timeout.ms", "100");
|
||||||
|
hdfsConfig.set("hadoop.registry.zk.retry.times", "1");
|
||||||
|
ApiServiceClient asc = new ApiServiceClient();
|
||||||
|
asc.serviceInit(hdfsConfig);
|
||||||
|
asc.actionCleanUp("testapp", "testuser");
|
||||||
|
|
||||||
|
try {
|
||||||
|
serviceManager.start();
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (e.getMessage().contains("Filesystem closed")) {
|
||||||
|
fail("SystemServiceManagerImpl failed to handle " +
|
||||||
|
"FileSystem close");
|
||||||
|
} else {
|
||||||
|
fail("Should not get any exceptions");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
serviceManager.stop();
|
||||||
|
fs = hdfsCluster.getFileSystem();
|
||||||
|
if (fs.exists(path)) {
|
||||||
|
fs.delete(path, true);
|
||||||
|
}
|
||||||
|
if (hdfsCluster != null) {
|
||||||
|
hdfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -281,7 +281,7 @@ public abstract int actionUpgradeComponents(String appName,
|
|||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract int actionCleanUp(String appName, String userName) throws
|
public abstract int actionCleanUp(String appName, String userName) throws
|
||||||
IOException, YarnException;
|
IOException, YarnException, InterruptedException;
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
|
Loading…
Reference in New Issue
Block a user