YARN-11578. Cache fs supports chmod in LogAggregationFileController. (#6120)
This commit is contained in:
parent
b87180568b
commit
a04a9e107b
@ -33,7 +33,9 @@
|
|||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
@ -111,6 +113,35 @@ public abstract class LogAggregationFileController {
|
|||||||
|
|
||||||
protected boolean fsSupportsChmod = true;
|
protected boolean fsSupportsChmod = true;
|
||||||
|
|
||||||
|
private static class FsLogPathKey {
|
||||||
|
private Class<? extends FileSystem> fsType;
|
||||||
|
private Path logPath;
|
||||||
|
|
||||||
|
FsLogPathKey(Class<? extends FileSystem> fsType, Path logPath) {
|
||||||
|
this.fsType = fsType;
|
||||||
|
this.logPath = logPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
FsLogPathKey that = (FsLogPathKey) o;
|
||||||
|
return Objects.equals(fsType, that.fsType) && Objects.equals(logPath, that.logPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(fsType, logPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private static final ConcurrentHashMap<FsLogPathKey, Boolean> FS_CHMOD_CACHE
|
||||||
|
= new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public LogAggregationFileController() {}
|
public LogAggregationFileController() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -429,26 +460,34 @@ public void verifyAndCreateRemoteLogDir() {
|
|||||||
+ remoteRootLogDir + "]", e);
|
+ remoteRootLogDir + "]", e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
//Check if FS has capability to set/modify permissions
|
final FsLogPathKey key = new FsLogPathKey(remoteFS.getClass(), qualified);
|
||||||
Path permissionCheckFile = new Path(qualified, String.format("%s.permission_check",
|
FileSystem finalRemoteFS = remoteFS;
|
||||||
RandomStringUtils.randomAlphanumeric(8)));
|
fsSupportsChmod = FS_CHMOD_CACHE.computeIfAbsent(key,
|
||||||
|
k -> checkFsSupportsChmod(finalRemoteFS, remoteRootLogDir, qualified));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkFsSupportsChmod(FileSystem remoteFS, Path logDir, Path qualified) {
|
||||||
|
//Check if FS has capability to set/modify permissions
|
||||||
|
Path permissionCheckFile = new Path(qualified, String.format("%s.permission_check",
|
||||||
|
RandomStringUtils.randomAlphanumeric(8)));
|
||||||
|
try {
|
||||||
|
remoteFS.createNewFile(permissionCheckFile);
|
||||||
|
remoteFS.setPermission(permissionCheckFile, new FsPermission(TLDIR_PERMISSIONS));
|
||||||
|
return true;
|
||||||
|
} catch (UnsupportedOperationException use) {
|
||||||
|
LOG.info("Unable to set permissions for configured filesystem since"
|
||||||
|
+ " it does not support this {}", remoteFS.getScheme());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to check if FileSystem supports permissions on "
|
||||||
|
+ "remoteLogDir [{}]", logDir, e);
|
||||||
|
} finally {
|
||||||
try {
|
try {
|
||||||
remoteFS.createNewFile(permissionCheckFile);
|
remoteFS.delete(permissionCheckFile, false);
|
||||||
remoteFS.setPermission(permissionCheckFile, new FsPermission(TLDIR_PERMISSIONS));
|
} catch (IOException ignored) {
|
||||||
} catch (UnsupportedOperationException use) {
|
|
||||||
LOG.info("Unable to set permissions for configured filesystem since"
|
|
||||||
+ " it does not support this {}", remoteFS.getScheme());
|
|
||||||
fsSupportsChmod = false;
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Failed to check if FileSystem supports permissions on "
|
|
||||||
+ "remoteLogDir [" + remoteRootLogDir + "]", e);
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
remoteFS.delete(permissionCheckFile, false);
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -19,7 +19,9 @@
|
|||||||
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
@ -35,12 +37,14 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController.TLDIR_PERMISSIONS;
|
import static org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController.TLDIR_PERMISSIONS;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.argThat;
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -116,30 +120,76 @@ public String toString() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testRemoteDirCreationWithCustomUser() throws Exception {
|
void testRemoteDirCreationWithCustomUser() throws Exception {
|
||||||
|
LogAggregationFileController controller = mock(
|
||||||
|
LogAggregationFileController.class, Mockito.CALLS_REAL_METHODS);
|
||||||
FileSystem fs = mock(FileSystem.class);
|
FileSystem fs = mock(FileSystem.class);
|
||||||
|
setupCustomUserMocks(controller, fs, "/tmp/logs");
|
||||||
|
|
||||||
|
controller.initialize(new Configuration(), "TFile");
|
||||||
|
controller.fsSupportsChmod = false;
|
||||||
|
|
||||||
|
controller.verifyAndCreateRemoteLogDir();
|
||||||
|
assertPermissionFileWasUsedOneTime(fs);
|
||||||
|
assertTrue(controller.fsSupportsChmod);
|
||||||
|
|
||||||
|
doThrow(UnsupportedOperationException.class).when(fs).setPermission(any(), any());
|
||||||
|
controller.verifyAndCreateRemoteLogDir();
|
||||||
|
assertPermissionFileWasUsedOneTime(fs); // still once -> cached
|
||||||
|
assertTrue(controller.fsSupportsChmod);
|
||||||
|
|
||||||
|
controller.fsSupportsChmod = false;
|
||||||
|
controller.verifyAndCreateRemoteLogDir();
|
||||||
|
assertPermissionFileWasUsedOneTime(fs); // still once -> cached
|
||||||
|
assertTrue(controller.fsSupportsChmod);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testRemoteDirCreationWithCustomUserFsChmodNotSupported() throws Exception {
|
||||||
|
LogAggregationFileController controller = mock(
|
||||||
|
LogAggregationFileController.class, Mockito.CALLS_REAL_METHODS);
|
||||||
|
FileSystem fs = mock(FileSystem.class);
|
||||||
|
setupCustomUserMocks(controller, fs, "/tmp/logs2");
|
||||||
|
doThrow(UnsupportedOperationException.class).when(fs).setPermission(any(), any());
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "/tmp/logs2");
|
||||||
|
controller.initialize(conf, "TFile");
|
||||||
|
controller.verifyAndCreateRemoteLogDir();
|
||||||
|
assertPermissionFileWasUsedOneTime(fs);
|
||||||
|
assertFalse(controller.fsSupportsChmod);
|
||||||
|
|
||||||
|
controller.verifyAndCreateRemoteLogDir();
|
||||||
|
assertPermissionFileWasUsedOneTime(fs); // still once -> cached
|
||||||
|
assertFalse(controller.fsSupportsChmod);
|
||||||
|
|
||||||
|
controller.fsSupportsChmod = true;
|
||||||
|
controller.verifyAndCreateRemoteLogDir();
|
||||||
|
assertPermissionFileWasUsedOneTime(fs); // still once -> cached
|
||||||
|
assertFalse(controller.fsSupportsChmod);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setupCustomUserMocks(LogAggregationFileController controller,
|
||||||
|
FileSystem fs, String path)
|
||||||
|
throws URISyntaxException, IOException {
|
||||||
doReturn(new URI("")).when(fs).getUri();
|
doReturn(new URI("")).when(fs).getUri();
|
||||||
doReturn(new FileStatus(128, false, 0, 64, System.currentTimeMillis(),
|
doReturn(new FileStatus(128, false, 0, 64, System.currentTimeMillis(),
|
||||||
System.currentTimeMillis(), new FsPermission(TLDIR_PERMISSIONS),
|
System.currentTimeMillis(), new FsPermission(TLDIR_PERMISSIONS),
|
||||||
"not_yarn_user", "yarn_group", new Path("/tmp/logs"))).when(fs)
|
"not_yarn_user", "yarn_group", new Path(path))).when(fs)
|
||||||
.getFileStatus(any(Path.class));
|
.getFileStatus(any(Path.class));
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
LogAggregationFileController controller = mock(
|
|
||||||
LogAggregationFileController.class, Mockito.CALLS_REAL_METHODS);
|
|
||||||
controller.fsSupportsChmod = true;
|
|
||||||
doReturn(fs).when(controller).getFileSystem(any(Configuration.class));
|
doReturn(fs).when(controller).getFileSystem(any(Configuration.class));
|
||||||
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||||
"yarn_user", new String[]{"yarn_group", "other_group"});
|
"yarn_user", new String[]{"yarn_group", "other_group"});
|
||||||
UserGroupInformation.setLoginUser(ugi);
|
UserGroupInformation.setLoginUser(ugi);
|
||||||
|
}
|
||||||
|
|
||||||
controller.initialize(conf, "TFile");
|
private static void assertPermissionFileWasUsedOneTime(FileSystem fs) throws IOException {
|
||||||
controller.verifyAndCreateRemoteLogDir();
|
verify(fs, times(1))
|
||||||
|
.createNewFile(argThat(new PathContainsString(".permission_check")));
|
||||||
verify(fs).createNewFile(argThat(new PathContainsString(".permission_check")));
|
verify(fs, times(1))
|
||||||
verify(fs).setPermission(argThat(new PathContainsString(".permission_check")),
|
.setPermission(argThat(new PathContainsString(".permission_check")),
|
||||||
eq(new FsPermission(TLDIR_PERMISSIONS)));
|
eq(new FsPermission(TLDIR_PERMISSIONS)));
|
||||||
verify(fs).delete(argThat(new PathContainsString(".permission_check")), eq(false));
|
verify(fs, times(1))
|
||||||
assertTrue(controller.fsSupportsChmod);
|
.delete(argThat(new PathContainsString(".permission_check")), eq(false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user