HDFS-5040.Audit log for admin commands/ logging output of all DFS admin commands. Contributed by Kuhu Shukla.
This commit is contained in:
parent
14fec04ed3
commit
9d3e4cccf9
@ -87,6 +87,7 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
|
||||
@ -351,7 +352,7 @@ boolean isAuditEnabled() {
|
||||
&& !auditLoggers.isEmpty();
|
||||
}
|
||||
|
||||
private void logAuditEvent(boolean succeeded, String cmd, String src)
|
||||
void logAuditEvent(boolean succeeded, String cmd, String src)
|
||||
throws IOException {
|
||||
logAuditEvent(succeeded, cmd, src, null, null);
|
||||
}
|
||||
@ -1715,7 +1716,8 @@ public BlocksWithLocations getBlocks(DatanodeID datanode, long size)
|
||||
* Dump all metadata into specified file
|
||||
*/
|
||||
void metaSave(String filename) throws IOException {
|
||||
checkSuperuserPrivilege();
|
||||
String operationName = "metaSave";
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
writeLock();
|
||||
try {
|
||||
@ -1727,8 +1729,9 @@ void metaSave(String filename) throws IOException {
|
||||
out.flush();
|
||||
out.close();
|
||||
} finally {
|
||||
writeUnlock("metaSave");
|
||||
writeUnlock(operationName);
|
||||
}
|
||||
logAuditEvent(true, operationName, null);
|
||||
}
|
||||
|
||||
private void metaSave(PrintWriter out) {
|
||||
@ -3123,7 +3126,7 @@ void setQuota(String src, long nsQuota, long ssQuota, StorageType type)
|
||||
requireEffectiveLayoutVersionForFeature(Feature.QUOTA_BY_STORAGE_TYPE);
|
||||
}
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final String operationName = "setQuota";
|
||||
final String operationName = getQuotaCommand(nsQuota, ssQuota);
|
||||
writeLock();
|
||||
boolean success = false;
|
||||
try {
|
||||
@ -4207,30 +4210,34 @@ int getNumberOfDatanodes(DatanodeReportType type) {
|
||||
}
|
||||
}
|
||||
|
||||
DatanodeInfo[] datanodeReport(final DatanodeReportType type
|
||||
) throws AccessControlException, StandbyException {
|
||||
checkSuperuserPrivilege();
|
||||
DatanodeInfo[] datanodeReport(final DatanodeReportType type)
|
||||
throws IOException {
|
||||
String operationName = "datanodeReport";
|
||||
DatanodeInfo[] arr;
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
final DatanodeManager dm = getBlockManager().getDatanodeManager();
|
||||
final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
|
||||
|
||||
DatanodeInfo[] arr = new DatanodeInfo[results.size()];
|
||||
arr = new DatanodeInfo[results.size()];
|
||||
for (int i=0; i<arr.length; i++) {
|
||||
arr[i] = new DatanodeInfoBuilder().setFrom(results.get(i))
|
||||
.build();
|
||||
}
|
||||
return arr;
|
||||
} finally {
|
||||
readUnlock("datanodeReport");
|
||||
readUnlock(operationName);
|
||||
}
|
||||
logAuditEvent(true, operationName, null);
|
||||
return arr;
|
||||
}
|
||||
|
||||
DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type
|
||||
) throws AccessControlException, StandbyException {
|
||||
checkSuperuserPrivilege();
|
||||
) throws IOException {
|
||||
String operationName = "getDatanodeStorageReport";
|
||||
DatanodeStorageReport[] reports;
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
readLock();
|
||||
try {
|
||||
@ -4238,17 +4245,18 @@ DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type
|
||||
final DatanodeManager dm = getBlockManager().getDatanodeManager();
|
||||
final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type);
|
||||
|
||||
DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes.size()];
|
||||
reports = new DatanodeStorageReport[datanodes.size()];
|
||||
for (int i = 0; i < reports.length; i++) {
|
||||
final DatanodeDescriptor d = datanodes.get(i);
|
||||
reports[i] = new DatanodeStorageReport(
|
||||
new DatanodeInfoBuilder().setFrom(d).build(),
|
||||
d.getStorageReports());
|
||||
}
|
||||
return reports;
|
||||
} finally {
|
||||
readUnlock("getDatanodeStorageReport");
|
||||
}
|
||||
logAuditEvent(true, operationName, null);
|
||||
return reports;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -4258,8 +4266,9 @@ DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type
|
||||
*/
|
||||
boolean saveNamespace(final long timeWindow, final long txGap)
|
||||
throws IOException {
|
||||
String operationName = "saveNamespace";
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
checkSuperuserPrivilege();
|
||||
checkSuperuserPrivilege(operationName);
|
||||
|
||||
boolean saved = false;
|
||||
cpLock(); // Block if a checkpointing is in progress on standby.
|
||||
@ -4273,12 +4282,13 @@ boolean saveNamespace(final long timeWindow, final long txGap)
|
||||
}
|
||||
saved = getFSImage().saveNamespace(timeWindow, txGap, this);
|
||||
} finally {
|
||||
readUnlock("saveNamespace");
|
||||
readUnlock(operationName);
|
||||
cpUnlock();
|
||||
}
|
||||
if (saved) {
|
||||
LOG.info("New namespace image has been created");
|
||||
}
|
||||
logAuditEvent(true, operationName, null);
|
||||
return saved;
|
||||
}
|
||||
|
||||
@ -4288,9 +4298,10 @@ boolean saveNamespace(final long timeWindow, final long txGap)
|
||||
*
|
||||
* @throws AccessControlException if superuser privilege is violated.
|
||||
*/
|
||||
boolean restoreFailedStorage(String arg) throws AccessControlException,
|
||||
StandbyException {
|
||||
checkSuperuserPrivilege();
|
||||
boolean restoreFailedStorage(String arg) throws IOException {
|
||||
String operationName = getFailedStorageCommand(arg);
|
||||
boolean val = false;
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
cpLock(); // Block if a checkpointing is in progress on standby.
|
||||
writeLock();
|
||||
@ -4298,17 +4309,18 @@ boolean restoreFailedStorage(String arg) throws AccessControlException,
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
|
||||
// if it is disabled - enable it and vice versa.
|
||||
if(arg.equals("check"))
|
||||
return getFSImage().getStorage().getRestoreFailedStorage();
|
||||
|
||||
boolean val = arg.equals("true"); // false if not
|
||||
getFSImage().getStorage().setRestoreFailedStorage(val);
|
||||
|
||||
return val;
|
||||
if(arg.equals("check")) {
|
||||
val = getFSImage().getStorage().getRestoreFailedStorage();
|
||||
} else {
|
||||
val = arg.equals("true"); // false if not
|
||||
getFSImage().getStorage().setRestoreFailedStorage(val);
|
||||
}
|
||||
} finally {
|
||||
writeUnlock("restoreFailedStorage");
|
||||
writeUnlock(operationName);
|
||||
cpUnlock();
|
||||
}
|
||||
logAuditEvent(true, operationName, null);
|
||||
return val;
|
||||
}
|
||||
|
||||
Date getStartTime() {
|
||||
@ -4316,7 +4328,8 @@ Date getStartTime() {
|
||||
}
|
||||
|
||||
void finalizeUpgrade() throws IOException {
|
||||
checkSuperuserPrivilege();
|
||||
String operationName = "finalizeUpgrade";
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
cpLock(); // Block if a checkpointing is in progress on standby.
|
||||
writeLock();
|
||||
@ -4324,26 +4337,33 @@ void finalizeUpgrade() throws IOException {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
getFSImage().finalizeUpgrade(this.isHaEnabled() && inActiveState());
|
||||
} finally {
|
||||
writeUnlock("finalizeUpgrade");
|
||||
writeUnlock(operationName);
|
||||
cpUnlock();
|
||||
}
|
||||
logAuditEvent(true, operationName, null);
|
||||
}
|
||||
|
||||
void refreshNodes() throws IOException {
|
||||
String operationName = "refreshNodes";
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
checkSuperuserPrivilege();
|
||||
checkSuperuserPrivilege(operationName);
|
||||
getBlockManager().getDatanodeManager().refreshNodes(new HdfsConfiguration());
|
||||
logAuditEvent(true, operationName, null);
|
||||
}
|
||||
|
||||
void setBalancerBandwidth(long bandwidth) throws IOException {
|
||||
String operationName = "setBalancerBandwidth";
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
checkSuperuserPrivilege();
|
||||
checkSuperuserPrivilege(operationName);
|
||||
getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
|
||||
logAuditEvent(true, operationName, null);
|
||||
}
|
||||
|
||||
boolean setSafeMode(SafeModeAction action) throws IOException {
|
||||
String operationName = action.toString().toLowerCase();
|
||||
boolean error = false;
|
||||
if (action != SafeModeAction.SAFEMODE_GET) {
|
||||
checkSuperuserPrivilege();
|
||||
checkSuperuserPrivilege(operationName);
|
||||
switch(action) {
|
||||
case SAFEMODE_LEAVE: // leave safe mode
|
||||
leaveSafeMode(false);
|
||||
@ -4356,8 +4376,12 @@ boolean setSafeMode(SafeModeAction action) throws IOException {
|
||||
break;
|
||||
default:
|
||||
LOG.error("Unexpected safe mode action");
|
||||
error = true;
|
||||
}
|
||||
}
|
||||
if (!error) {
|
||||
logAuditEvent(true, operationName, null);
|
||||
}
|
||||
return isInSafeMode();
|
||||
}
|
||||
|
||||
@ -4493,7 +4517,9 @@ private synchronized void setManualAndResourceLowSafeMode(boolean manual,
|
||||
}
|
||||
|
||||
CheckpointSignature rollEditLog() throws IOException {
|
||||
checkSuperuserPrivilege();
|
||||
String operationName = "rollEditLog";
|
||||
CheckpointSignature result = null;
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.JOURNAL);
|
||||
writeLock();
|
||||
try {
|
||||
@ -4502,10 +4528,12 @@ CheckpointSignature rollEditLog() throws IOException {
|
||||
if (Server.isRpcInvocation()) {
|
||||
LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
|
||||
}
|
||||
return getFSImage().rollEditLog(getEffectiveLayoutVersion());
|
||||
result = getFSImage().rollEditLog(getEffectiveLayoutVersion());
|
||||
} finally {
|
||||
writeUnlock("rollEditLog");
|
||||
writeUnlock(operationName);
|
||||
}
|
||||
logAuditEvent(true, operationName, null);
|
||||
return result;
|
||||
}
|
||||
|
||||
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
|
||||
@ -6194,11 +6222,11 @@ void allowSnapshot(String path) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final String operationName = "allowSnapshot";
|
||||
boolean success = false;
|
||||
checkSuperuserPrivilege(operationName);
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot allow snapshot for " + path);
|
||||
checkSuperuserPrivilege();
|
||||
FSDirSnapshotOp.allowSnapshot(dir, snapshotManager, path);
|
||||
success = true;
|
||||
} finally {
|
||||
@ -6212,12 +6240,12 @@ void allowSnapshot(String path) throws IOException {
|
||||
void disallowSnapshot(String path) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final String operationName = "disallowSnapshot";
|
||||
checkSuperuserPrivilege(operationName);
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot disallow snapshot for " + path);
|
||||
checkSuperuserPrivilege();
|
||||
FSDirSnapshotOp.disallowSnapshot(dir, snapshotManager, path);
|
||||
success = true;
|
||||
} finally {
|
||||
@ -6410,7 +6438,8 @@ void removeSnapshottableDirs(List<INodeDirectory> toRemove) {
|
||||
}
|
||||
|
||||
RollingUpgradeInfo queryRollingUpgrade() throws IOException {
|
||||
checkSuperuserPrivilege();
|
||||
final String operationName = "queryRollingUpgrade";
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.READ);
|
||||
readLock();
|
||||
try {
|
||||
@ -6420,15 +6449,16 @@ RollingUpgradeInfo queryRollingUpgrade() throws IOException {
|
||||
Preconditions.checkNotNull(rollingUpgradeInfo);
|
||||
boolean hasRollbackImage = this.getFSImage().hasRollbackFSImage();
|
||||
rollingUpgradeInfo.setCreatedRollbackImages(hasRollbackImage);
|
||||
return rollingUpgradeInfo;
|
||||
} finally {
|
||||
readUnlock("queryRollingUpgrade");
|
||||
readUnlock(operationName);
|
||||
}
|
||||
logAuditEvent(true, operationName, null, null, null);
|
||||
return rollingUpgradeInfo;
|
||||
}
|
||||
|
||||
RollingUpgradeInfo startRollingUpgrade() throws IOException {
|
||||
final String operationName = "startRollingUpgrade";
|
||||
checkSuperuserPrivilege();
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
try {
|
||||
@ -6619,7 +6649,7 @@ void checkRollingUpgrade(String action) throws RollingUpgradeException {
|
||||
|
||||
RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
|
||||
final String operationName = "finalizeRollingUpgrade";
|
||||
checkSuperuserPrivilege();
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
try {
|
||||
@ -7744,5 +7774,38 @@ public int getNumEnteringMaintenanceDataNodes() {
|
||||
.size();
|
||||
}
|
||||
|
||||
void checkSuperuserPrivilege(String operationName)
|
||||
throws IOException {
|
||||
try {
|
||||
checkSuperuserPrivilege();
|
||||
} catch (AccessControlException ace) {
|
||||
logAuditEvent(false, operationName, null);
|
||||
throw ace;
|
||||
}
|
||||
}
|
||||
|
||||
String getQuotaCommand(long nsQuota, long dsQuota) {
|
||||
if (nsQuota == HdfsConstants.QUOTA_RESET
|
||||
&& dsQuota == HdfsConstants.QUOTA_DONT_SET) {
|
||||
return "clearQuota";
|
||||
} else if (nsQuota == HdfsConstants.QUOTA_DONT_SET
|
||||
&& dsQuota == HdfsConstants.QUOTA_RESET) {
|
||||
return "clearSpaceQuota";
|
||||
} else if (dsQuota == HdfsConstants.QUOTA_DONT_SET) {
|
||||
return "setQuota";
|
||||
} else {
|
||||
return "setSpaceQuota";
|
||||
}
|
||||
}
|
||||
|
||||
String getFailedStorageCommand(String mode) {
|
||||
if(mode.equals("check")) {
|
||||
return "checkRestoreFailedStorage";
|
||||
} else if (mode.equals("true")) {
|
||||
return "enableRestoreFailedStorage";
|
||||
} else {
|
||||
return "disableRestoreFailedStorage";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1624,6 +1624,7 @@ public void refreshServiceAcl() throws IOException {
|
||||
if (this.serviceRpcServer != null) {
|
||||
this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
|
||||
}
|
||||
namesystem.logAuditEvent(true, "refreshServiceAcl", null);
|
||||
}
|
||||
|
||||
@Override // RefreshAuthorizationPolicyProtocol
|
||||
@ -1631,17 +1632,19 @@ public void refreshUserToGroupsMappings() throws IOException {
|
||||
LOG.info("Refreshing all user-to-groups mappings. Requested by user: " +
|
||||
getRemoteUser().getShortUserName());
|
||||
Groups.getUserToGroupsMappingService().refresh();
|
||||
namesystem.logAuditEvent(true, "refreshUserToGroupsMappings", null);
|
||||
}
|
||||
|
||||
@Override // RefreshAuthorizationPolicyProtocol
|
||||
public void refreshSuperUserGroupsConfiguration() {
|
||||
public void refreshSuperUserGroupsConfiguration() throws IOException {
|
||||
LOG.info("Refreshing SuperUser proxy group mapping list ");
|
||||
|
||||
ProxyUsers.refreshSuperUserGroupsConfiguration();
|
||||
namesystem.logAuditEvent(true, "refreshSuperUserGroupsConfiguration", null);
|
||||
}
|
||||
|
||||
@Override // RefreshCallQueueProtocol
|
||||
public void refreshCallQueue() {
|
||||
public void refreshCallQueue() throws IOException {
|
||||
LOG.info("Refreshing call queue.");
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
@ -1649,6 +1652,7 @@ public void refreshCallQueue() {
|
||||
if (this.serviceRpcServer != null) {
|
||||
serviceRpcServer.refreshCallQueue(conf);
|
||||
}
|
||||
namesystem.logAuditEvent(true, "refreshCallQueue", null);
|
||||
}
|
||||
|
||||
@Override // GenericRefreshProtocol
|
||||
@ -2412,22 +2416,30 @@ public void disableErasureCodingPolicy(String ecPolicyName)
|
||||
@Override // ReconfigurationProtocol
|
||||
public void startReconfiguration() throws IOException {
|
||||
checkNNStartup();
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
String operationName = "startNamenodeReconfiguration";
|
||||
namesystem.checkSuperuserPrivilege(operationName);
|
||||
nn.startReconfigurationTask();
|
||||
namesystem.logAuditEvent(true, operationName, null);
|
||||
}
|
||||
|
||||
@Override // ReconfigurationProtocol
|
||||
public ReconfigurationTaskStatus getReconfigurationStatus()
|
||||
throws IOException {
|
||||
checkNNStartup();
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
return nn.getReconfigurationTaskStatus();
|
||||
String operationName = "getNamenodeReconfigurationStatus";
|
||||
namesystem.checkSuperuserPrivilege(operationName);
|
||||
ReconfigurationTaskStatus status = nn.getReconfigurationTaskStatus();
|
||||
namesystem.logAuditEvent(true, operationName, null);
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override // ReconfigurationProtocol
|
||||
public List<String> listReconfigurableProperties() throws IOException {
|
||||
checkNNStartup();
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
return Lists.newArrayList(nn.getReconfigurableProperties());
|
||||
String operationName = "listNamenodeReconfigurableProperties";
|
||||
namesystem.checkSuperuserPrivilege(operationName);
|
||||
List<String> result = Lists.newArrayList(nn.getReconfigurableProperties());
|
||||
namesystem.logAuditEvent(true, operationName, null);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -152,7 +152,6 @@ public void testWebHdfsAuditLogger() throws IOException, URISyntaxException {
|
||||
conf.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
|
||||
DummyAuditLogger.class.getName());
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
GetOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
|
||||
try {
|
||||
cluster.waitClusterUp();
|
||||
@ -168,7 +167,8 @@ public void testWebHdfsAuditLogger() throws IOException, URISyntaxException {
|
||||
conn.connect();
|
||||
assertEquals(200, conn.getResponseCode());
|
||||
conn.disconnect();
|
||||
assertEquals(1, DummyAuditLogger.logCount);
|
||||
assertEquals("getfileinfo", DummyAuditLogger.lastCommand);
|
||||
DummyAuditLogger.resetLogCount();
|
||||
assertEquals("127.0.0.1", DummyAuditLogger.remoteAddr);
|
||||
|
||||
// non-trusted proxied request
|
||||
@ -178,7 +178,9 @@ public void testWebHdfsAuditLogger() throws IOException, URISyntaxException {
|
||||
conn.connect();
|
||||
assertEquals(200, conn.getResponseCode());
|
||||
conn.disconnect();
|
||||
assertEquals(2, DummyAuditLogger.logCount);
|
||||
assertEquals("getfileinfo", DummyAuditLogger.lastCommand);
|
||||
assertTrue(DummyAuditLogger.logCount == 1);
|
||||
DummyAuditLogger.resetLogCount();
|
||||
assertEquals("127.0.0.1", DummyAuditLogger.remoteAddr);
|
||||
|
||||
// trusted proxied request
|
||||
@ -190,7 +192,8 @@ public void testWebHdfsAuditLogger() throws IOException, URISyntaxException {
|
||||
conn.connect();
|
||||
assertEquals(200, conn.getResponseCode());
|
||||
conn.disconnect();
|
||||
assertEquals(3, DummyAuditLogger.logCount);
|
||||
assertEquals("getfileinfo", DummyAuditLogger.lastCommand);
|
||||
assertTrue(DummyAuditLogger.logCount == 1);
|
||||
assertEquals("1.1.1.1", DummyAuditLogger.remoteAddr);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
@ -547,6 +550,7 @@ public static class DummyAuditLogger implements AuditLogger {
|
||||
static int unsuccessfulCount;
|
||||
static short foundPermission;
|
||||
static String remoteAddr;
|
||||
private static String lastCommand;
|
||||
|
||||
public void initialize(Configuration conf) {
|
||||
initialized = true;
|
||||
@ -565,11 +569,16 @@ public void logAuditEvent(boolean succeeded, String userName,
|
||||
if (!succeeded) {
|
||||
unsuccessfulCount++;
|
||||
}
|
||||
lastCommand = cmd;
|
||||
if (stat != null) {
|
||||
foundPermission = stat.getPermission().toShort();
|
||||
}
|
||||
}
|
||||
|
||||
public static String getLastCommand() {
|
||||
return lastCommand;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class BrokenAuditLogger implements AuditLogger {
|
||||
@ -581,7 +590,9 @@ public void initialize(Configuration conf) {
|
||||
public void logAuditEvent(boolean succeeded, String userName,
|
||||
InetAddress addr, String cmd, String src, String dst,
|
||||
FileStatus stat) {
|
||||
throw new RuntimeException("uh oh");
|
||||
if (!cmd.equals("datanodeReport")) {
|
||||
throw new RuntimeException("uh oh");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -31,24 +32,30 @@
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.After;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestAuditLoggerWithCommands {
|
||||
|
||||
@ -65,13 +72,15 @@ public class TestAuditLoggerWithCommands {
|
||||
static UserGroupInformation user2;
|
||||
private static NamenodeProtocols proto;
|
||||
|
||||
@BeforeClass
|
||||
public static void initialize() throws Exception {
|
||||
@Before
|
||||
public void initialize() throws Exception {
|
||||
// start a cluster
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,true);
|
||||
conf.setBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||
conf.setBoolean(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, true);
|
||||
cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
||||
cluster.waitActive();
|
||||
@ -88,8 +97,9 @@ public static void initialize() throws Exception {
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
Server.getCurCall().set(null);
|
||||
fs.close();
|
||||
fs2.close();
|
||||
fileSys.close();
|
||||
@ -126,22 +136,29 @@ public void testSetQuota() throws Exception {
|
||||
Path path = new Path("/testdir/testdir1");
|
||||
fs.mkdirs(path);
|
||||
fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
|
||||
verifySetQuota(path, HdfsConstants.QUOTA_RESET,
|
||||
HdfsConstants.QUOTA_DONT_SET);
|
||||
verifySetQuota(path, HdfsConstants.QUOTA_DONT_SET,
|
||||
HdfsConstants.QUOTA_RESET);
|
||||
verifySetQuota(path, HdfsConstants.QUOTA_DONT_SET,
|
||||
HdfsConstants.BYTES_IN_INTEGER);
|
||||
verifySetQuota(path, HdfsConstants.BYTES_IN_INTEGER,
|
||||
HdfsConstants.BYTES_IN_INTEGER);
|
||||
fileSys.close();
|
||||
}
|
||||
|
||||
private void verifySetQuota(Path path, long nsQuota, long ssQuota)
|
||||
throws IOException {
|
||||
String operationName = cluster.getNamesystem().getQuotaCommand(
|
||||
nsQuota, ssQuota);
|
||||
String acePattern =
|
||||
".*allowed=false.*ugi=theDoctor.*cmd=.*" + operationName + ".*";
|
||||
try {
|
||||
((DistributedFileSystem)fileSys).setQuota(path, 10l, 10l);
|
||||
fail("The operation should have failed with AccessControlException");
|
||||
((DistributedFileSystem) fileSys).setQuota(path, nsQuota, ssQuota);
|
||||
fail("The operation should have failed");
|
||||
} catch (AccessControlException ace) {
|
||||
}
|
||||
String acePattern =
|
||||
".*allowed=false.*ugi=theDoctor.*cmd=setQuota.*";
|
||||
int length = verifyAuditLogs(acePattern);
|
||||
fileSys.close();
|
||||
try {
|
||||
((DistributedFileSystem)fileSys).setQuota(path, 10l, 10l);
|
||||
fail("The operation should have failed with IOException");
|
||||
} catch (IOException ace) {
|
||||
}
|
||||
assertTrue("Unexpected log from getContentSummary",
|
||||
length == auditlog.getOutput().split("\n").length);
|
||||
verifyAuditLogs(acePattern);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -180,7 +197,7 @@ public void testCreateRenameSnapShot() throws Exception {
|
||||
".*allowed=false.*ugi=theDoctor.*cmd=renameSnapshot.*";
|
||||
fs.mkdirs(srcDir);
|
||||
fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
|
||||
cluster.getNamesystem().allowSnapshot(srcDir.toString());
|
||||
((DistributedFileSystem)fs).allowSnapshot(srcDir);
|
||||
try {
|
||||
fileSys.createSnapshot(srcDir);
|
||||
fail("The operation should have failed with AccessControlException");
|
||||
@ -215,7 +232,7 @@ public void testDeleteSnapshot() throws Exception {
|
||||
Path s1;
|
||||
fs.mkdirs(srcDir);
|
||||
fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
|
||||
cluster.getNamesystem().allowSnapshot(srcDir.toString());
|
||||
((DistributedFileSystem)fs).allowSnapshot(srcDir);
|
||||
try {
|
||||
s1 = fs.createSnapshot(srcDir);
|
||||
fileSys.deleteSnapshot(srcDir, s1.getName());
|
||||
@ -236,13 +253,66 @@ public void testDeleteSnapshot() throws Exception {
|
||||
length+1 == auditlog.getOutput().split("\n").length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllowSnapshot() throws Exception {
|
||||
Path srcDir = new Path(System.getProperty("user.dir"), "/src");
|
||||
fs.mkdirs(srcDir);
|
||||
String pattern =
|
||||
".*allowed=true.*ugi=" +
|
||||
System.getProperty("user.name")+".*cmd=allowSnapshot.*";
|
||||
try {
|
||||
((DistributedFileSystem)fs).allowSnapshot(srcDir);
|
||||
verifyAuditLogs(pattern);
|
||||
} catch (Exception e) {
|
||||
fail("The operation should not have failed with Exception");
|
||||
}
|
||||
fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
|
||||
try {
|
||||
((DistributedFileSystem)fileSys).allowSnapshot(srcDir);
|
||||
fail("The operation should have failed with AccessControlException");
|
||||
} catch (AccessControlException ace) {
|
||||
}
|
||||
pattern =
|
||||
".*allowed=false.*ugi=theDoctor.*cmd=allowSnapshot.*";
|
||||
verifyAuditLogs(pattern);
|
||||
fs.delete(srcDir, true);
|
||||
fileSys.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisallowSnapshot() throws Exception {
|
||||
Path srcDir = new Path(System.getProperty("user.dir"), "/src");
|
||||
fs.mkdirs(srcDir);
|
||||
cluster.getNamesystem().allowSnapshot(srcDir.toString());
|
||||
String pattern =
|
||||
".*allowed=true.*ugi=" +
|
||||
System.getProperty("user.name")+".*cmd=disallowSnapshot.*";
|
||||
try {
|
||||
((DistributedFileSystem)fs).disallowSnapshot(srcDir);
|
||||
verifyAuditLogs(pattern);
|
||||
} catch (Exception e) {
|
||||
fail("The operation should not have failed with Exception");
|
||||
}
|
||||
cluster.getNamesystem().allowSnapshot(srcDir.toString());
|
||||
fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
|
||||
try {
|
||||
((DistributedFileSystem)fileSys).disallowSnapshot(srcDir);
|
||||
fail("The operation should have failed with AccessControlException");
|
||||
} catch (AccessControlException ace) {
|
||||
pattern =
|
||||
".*allowed=false.*ugi=theDoctor.*cmd=disallowSnapshot.*";
|
||||
verifyAuditLogs(pattern);
|
||||
}
|
||||
fileSys.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddCacheDirective() throws Exception {
|
||||
removeExistingCachePools(null);
|
||||
proto.addCachePool(new CachePoolInfo("pool1").
|
||||
setMode(new FsPermission((short) 0)));
|
||||
CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
|
||||
setPath(new Path("/alpha")).
|
||||
setPath(new Path(System.getProperty("user.dir"), "/alpha")).
|
||||
setPool("pool1").
|
||||
build();
|
||||
fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
|
||||
@ -618,6 +688,579 @@ private int verifyAuditLogs(final boolean allowed, final String pattern) {
|
||||
return verifyAuditLogs(".*allowed=" + allowed + pattern);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaSave() throws Exception {
|
||||
String aceMetaSave =
|
||||
".*allowed=true.*cmd=metaSave.*";
|
||||
try {
|
||||
((DistributedFileSystem)fs).metaSave("test.log");
|
||||
verifyAuditLogs(aceMetaSave);
|
||||
} catch (Exception e) {
|
||||
fail("The operation should not have failed with Exception");
|
||||
}
|
||||
try {
|
||||
((DistributedFileSystem)fileSys).metaSave("test.log");
|
||||
fail("The operation should have failed with AccessControlException");
|
||||
} catch (IOException ace) {
|
||||
GenericTestUtils.assertExceptionContains("Access denied", ace);
|
||||
aceMetaSave =
|
||||
".*allowed=false.*cmd=metaSave.*";
|
||||
verifyAuditLogs(aceMetaSave);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartReconfiguration() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=startNamenodeReconfiguration.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
((NameNodeRpcServer) cluster.getNameNodeRpc()).startReconfiguration();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("StartConfiguration should have passed!");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
((NameNodeRpcServer)cluster.getNameNodeRpc()).startReconfiguration();
|
||||
fail(
|
||||
"startNameNodeReconfiguration should throw AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=startNamenodeReconfiguration.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetReconfigurationStatus() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=getNamenodeReconfigurationStatus.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
((NameNodeRpcServer) cluster.getNameNodeRpc()).getReconfigurationStatus();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("getNamenodeReconfigurationStatus " +
|
||||
" threw Exception!");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
((NameNodeRpcServer)cluster.getNameNodeRpc()).getReconfigurationStatus();
|
||||
fail("getNamenodeReconfigurationStatus " +
|
||||
" did not throw AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=getNamenodeReconfigurationStatus.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListReconfigurableProperties() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=listNamenodeReconfigurableProperties.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
((NameNodeRpcServer) cluster.getNameNodeRpc()).
|
||||
listReconfigurableProperties();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("listReconfigurableProperties " +
|
||||
" threw Exception!");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
((NameNodeRpcServer)cluster.getNameNodeRpc()).
|
||||
listReconfigurableProperties();
|
||||
fail("getNamenodeReconfigurationStatus " +
|
||||
" did not throw AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=listNamenodeReconfigurableProperties.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshUserToGroupsMappings() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=refreshUserToGroupsMappings.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
((NameNodeRpcServer)cluster.getNameNodeRpc()).
|
||||
refreshUserToGroupsMappings();
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshSuperUserGroupsConfiguration() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=refreshSuperUserGroupsConfiguration.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
((NameNodeRpcServer) cluster.getNameNodeRpc()).
|
||||
refreshSuperUserGroupsConfiguration();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail(" The operation threw an exception");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshQueue() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=refreshCallQueue.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
((NameNodeRpcServer) cluster.getNameNodeRpc()).refreshCallQueue();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail(" The operation threw an exception");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshServiceAcl() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=refreshServiceAcl.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
((NameNodeRpcServer) cluster.getNameNodeRpc()).refreshServiceAcl();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail(" The operation threw an exception" + e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinalizeRollingUpgrade() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=finalizeRollingUpgrade.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
fsNamesystem.setRollingUpgradeInfo(false, System.currentTimeMillis());
|
||||
try {
|
||||
fsNamesystem.finalizeRollingUpgrade();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("finalizeRollingUpgrade threw Exception");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
fsNamesystem.finalizeRollingUpgrade();
|
||||
fail("finalizeRollingUpgrade should throw AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=finalizeRollingUpgrade.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueryRollingUpgrade() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=queryRollingUpgrade.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
fsNamesystem.setRollingUpgradeInfo(false, System.currentTimeMillis());
|
||||
try {
|
||||
fsNamesystem.queryRollingUpgrade();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("queryRollingUpgrade threw Exception");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
fsNamesystem.queryRollingUpgrade();
|
||||
fail("queryRollingUpgrade should have thrown an AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=queryRollingUpgrade.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollEditLog() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=rollEditLog.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
fsNamesystem.rollEditLog();
|
||||
} catch (Exception e) {
|
||||
fail("rollEditLog threw Exception");
|
||||
}
|
||||
verifyAuditLogs(auditLogString);
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
fsNamesystem.rollEditLog();
|
||||
fail("rollEditLog should have thrown an AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=rollEditLog.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetSafeMode() throws Exception {
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
verifySuccessfulSetSafeMode(fsNamesystem,
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
|
||||
verifySuccessfulSetSafeMode(fsNamesystem,
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_GET);
|
||||
verifySuccessfulSetSafeMode(fsNamesystem,
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||
verifySuccessfulSetSafeMode(fsNamesystem,
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_FORCE_EXIT);
|
||||
String auditLogString;
|
||||
auditLogString =
|
||||
".*allowed=true.*cmd=safemode_get.*";
|
||||
fsNamesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET);
|
||||
verifyAuditLogs(auditLogString);
|
||||
auditLogString =
|
||||
".*allowed=true.*cmd=safemode_leave.*";
|
||||
fsNamesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||
verifyAuditLogs(auditLogString);
|
||||
auditLogString =
|
||||
".*allowed=true.*cmd=safemode_force_exit.*";
|
||||
fsNamesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_FORCE_EXIT);
|
||||
verifyAuditLogs(auditLogString);
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
verifySafeModeAction(fsNamesystem,
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
|
||||
verifySafeModeAction(fsNamesystem,
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||
verifySafeModeAction(fsNamesystem,
|
||||
HdfsConstants.SafeModeAction.SAFEMODE_FORCE_EXIT);
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetBalancerBandwidth() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=setBalancerBandwidth.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
fsNamesystem.setBalancerBandwidth(10);
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("setBalancerBandwidth threw exception!");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
fsNamesystem.setBalancerBandwidth(10);
|
||||
fail(
|
||||
"setBalancerBandwidth should have thrown AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=setBalancerBandwidth.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshNodes() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=refreshNodes.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
fsNamesystem.refreshNodes();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("refreshNodes threw exception!");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
fsNamesystem.refreshNodes();
|
||||
fail(
|
||||
"refreshNodes should have thrown an AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=refreshNodes.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinalizeUpgrade() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=finalizeUpgrade.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
fsNamesystem.finalizeUpgrade();
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("finalizeUpgrade threw Exception");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
fsNamesystem.finalizeUpgrade();
|
||||
fail("finalizeUpgrade should have thrown an AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=finalizeUpgrade.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveNamespace() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=saveNamespace.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
fsNamesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
|
||||
try {
|
||||
fsNamesystem.saveNamespace(10, 10);
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("saveNamespace threw Exception");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
fsNamesystem.saveNamespace(10, 10);
|
||||
fail("saveNamespace should have thrown an AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=saveNamespace.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatanodeReport() throws Exception {
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=datanodeReport.*";
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
try {
|
||||
fsNamesystem.datanodeReport(HdfsConstants.DatanodeReportType.ALL);
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("datanodeReport threw Exception");
|
||||
}
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
try {
|
||||
fsNamesystem.datanodeReport(HdfsConstants.DatanodeReportType.ALL);
|
||||
fail(
|
||||
"datanodeReport should have thrown an AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=datanodeReport.*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreFailedStorage() throws Exception {
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
verifyAuditRestoreFailedStorage(fsNamesystem, "check");
|
||||
verifyAuditRestoreFailedStorage(fsNamesystem, "true");
|
||||
verifyAuditRestoreFailedStorage(fsNamesystem, "false");
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
verifyAuditRestoreFailedStorageACE(fsNamesystem, "check");
|
||||
verifyAuditRestoreFailedStorageACE(fsNamesystem, "true");
|
||||
verifyAuditRestoreFailedStorageACE(fsNamesystem, "false");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDatanodeStorageReport() throws Exception {
|
||||
FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
|
||||
when(fsNamesystem.isExternalInvocation()).thenReturn(true);
|
||||
Server.Call call = spy(new Server.Call(
|
||||
1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
|
||||
Server.getCurCall().set(call);
|
||||
DatanodeStorageReport[] reports = fsNamesystem.getDatanodeStorageReport(
|
||||
HdfsConstants.DatanodeReportType.ALL);
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=" + "getDatanodeStorageReport" + ".*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
when(call.getRemoteUser()).thenReturn(
|
||||
UserGroupInformation.createRemoteUser("theDoctor"));
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=" + "getDatanodeStorageReport" + ".*";
|
||||
try {
|
||||
fsNamesystem.getDatanodeStorageReport(
|
||||
HdfsConstants.DatanodeReportType.ALL);
|
||||
fail("Should have thrown an AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyAuditRestoreFailedStorageACE(
|
||||
FSNamesystem fsNamesystem, String arg) throws IOException {
|
||||
String operationName = fsNamesystem.getFailedStorageCommand(arg);
|
||||
try {
|
||||
fsNamesystem.restoreFailedStorage(arg);
|
||||
fail(
|
||||
"RestoreFailedStorage should have thrown AccessControlException!");
|
||||
} catch (IOException ace) {
|
||||
assertEquals("Unexpected Exception!",
|
||||
ace.getClass(), AccessControlException.class);
|
||||
String auditLogString =
|
||||
".*allowed=false.*cmd=" + operationName + ".*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyAuditRestoreFailedStorage(
|
||||
FSNamesystem fsNamesystem, String arg) throws IOException {
|
||||
String operationName = fsNamesystem.getFailedStorageCommand(arg);
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=" + operationName + ".*";
|
||||
try {
|
||||
fsNamesystem.restoreFailedStorage(arg);
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail(
|
||||
"The operation should not have failed with Exception");
|
||||
}
|
||||
}
|
||||
|
||||
private void verifySuccessfulSetSafeMode(FSNamesystem fsNamesystem,
|
||||
HdfsConstants.SafeModeAction safeModeAction) throws IOException {
|
||||
String operationName = safeModeAction.toString().toLowerCase();
|
||||
String auditLogString =
|
||||
".*allowed=true.*cmd=" + operationName +".*";
|
||||
try {
|
||||
fsNamesystem.setSafeMode(safeModeAction);
|
||||
verifyAuditLogs(auditLogString);
|
||||
} catch (Exception e) {
|
||||
fail("The operation should not have failed with Exception");
|
||||
}
|
||||
}
|
||||
|
||||
private void verifySafeModeAction(
|
||||
FSNamesystem fsNamesystem, HdfsConstants.SafeModeAction safeModeAction)
|
||||
throws IOException {
|
||||
String operationName = safeModeAction.toString().toLowerCase();
|
||||
String auditLogString;
|
||||
try {
|
||||
fsNamesystem.setSafeMode(safeModeAction);
|
||||
fail("setSafeMode should have thrown an AccessControlException!");
|
||||
} catch (AccessControlException ace) {
|
||||
auditLogString =
|
||||
".*allowed=false.*cmd=" + operationName +".*";
|
||||
verifyAuditLogs(auditLogString);
|
||||
}
|
||||
}
|
||||
|
||||
private int verifyAuditLogs(String pattern) {
|
||||
int length = auditlog.getOutput().split("\n").length;
|
||||
String lastAudit = auditlog.getOutput().split("\n")[length - 1];
|
||||
@ -633,4 +1276,3 @@ private void removeExistingCachePools(String prevPool) throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -556,13 +556,18 @@ public void testTopUsers() throws Exception {
|
||||
assertEquals("Unexpected num windows", 3, windows.size());
|
||||
for (Map<String, List<Map<String, Object>>> window : windows) {
|
||||
final List<Map<String, Object>> ops = window.get("ops");
|
||||
assertEquals("Unexpected num ops", 3, ops.size());
|
||||
assertEquals("Unexpected num ops", 4, ops.size());
|
||||
for (Map<String, Object> op: ops) {
|
||||
if (op.get("opType").equals("datanodeReport")) {
|
||||
continue;
|
||||
}
|
||||
final long count = Long.parseLong(op.get("totalCount").toString());
|
||||
final String opType = op.get("opType").toString();
|
||||
final int expected;
|
||||
if (opType.equals(TopConf.ALL_CMDS)) {
|
||||
expected = 2*NUM_OPS;
|
||||
expected = 2 * NUM_OPS + 2;
|
||||
} else if (opType.equals("datanodeReport")) {
|
||||
expected = 2;
|
||||
} else {
|
||||
expected = NUM_OPS;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user