MAPREDUCE-6410. Fixed MapReduce JobHistory server to use the right (login) UGI to refresh log and cleaner settings. Contributed by Varun Saxena.
This commit is contained in:
parent
41ae7768eb
commit
d481684c7c
@ -548,6 +548,9 @@ Release 2.7.1 - UNRELEASED
|
|||||||
MAPREDUCE-6387. Serialize the recently added Task#encryptedSpillKey field at
|
MAPREDUCE-6387. Serialize the recently added Task#encryptedSpillKey field at
|
||||||
the end. (Arun Suresh via kasha)
|
the end. (Arun Suresh via kasha)
|
||||||
|
|
||||||
|
MAPREDUCE-6410. Fixed MapReduce JobHistory server to use the right (login)
|
||||||
|
UGI to refresh log and cleaner settings. (Varun Saxena via vinodkv)
|
||||||
|
|
||||||
Release 2.7.0 - 2015-04-20
|
Release 2.7.0 - 2015-04-20
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -52,6 +53,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService;
|
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolServerSideTranslatorPB;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@ -67,6 +69,8 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
|
|||||||
private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
|
private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
|
||||||
private JobHistory jobHistoryService = null;
|
private JobHistory jobHistoryService = null;
|
||||||
|
|
||||||
|
private UserGroupInformation loginUGI;
|
||||||
|
|
||||||
public HSAdminServer(AggregatedLogDeletionService aggLogDelService,
|
public HSAdminServer(AggregatedLogDeletionService aggLogDelService,
|
||||||
JobHistory jobHistoryService) {
|
JobHistory jobHistoryService) {
|
||||||
super(HSAdminServer.class.getName());
|
super(HSAdminServer.class.getName());
|
||||||
@ -125,9 +129,24 @@ public void serviceInit(Configuration conf) throws Exception {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
loginUGI = UserGroupInformation.getLoginUser();
|
||||||
|
} else {
|
||||||
|
loginUGI = UserGroupInformation.getCurrentUser();
|
||||||
|
}
|
||||||
clientRpcServer.start();
|
clientRpcServer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
UserGroupInformation getLoginUGI() {
|
||||||
|
return loginUGI;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setLoginUGI(UserGroupInformation ugi) {
|
||||||
|
loginUGI = ugi;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStop() throws Exception {
|
protected void serviceStop() throws Exception {
|
||||||
if (clientRpcServer != null) {
|
if (clientRpcServer != null) {
|
||||||
@ -233,7 +252,17 @@ public void refreshLoadedJobCache() throws IOException {
|
|||||||
public void refreshLogRetentionSettings() throws IOException {
|
public void refreshLogRetentionSettings() throws IOException {
|
||||||
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
|
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
|
||||||
|
|
||||||
aggLogDelService.refreshLogRetentionSettings();
|
try {
|
||||||
|
loginUGI.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws IOException {
|
||||||
|
aggLogDelService.refreshLogRetentionSettings();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
HSAuditLogger.logSuccess(user.getShortUserName(),
|
HSAuditLogger.logSuccess(user.getShortUserName(),
|
||||||
"refreshLogRetentionSettings", "HSAdminServer");
|
"refreshLogRetentionSettings", "HSAdminServer");
|
||||||
@ -243,7 +272,17 @@ public void refreshLogRetentionSettings() throws IOException {
|
|||||||
public void refreshJobRetentionSettings() throws IOException {
|
public void refreshJobRetentionSettings() throws IOException {
|
||||||
UserGroupInformation user = checkAcls("refreshJobRetentionSettings");
|
UserGroupInformation user = checkAcls("refreshJobRetentionSettings");
|
||||||
|
|
||||||
jobHistoryService.refreshJobRetentionSettings();
|
try {
|
||||||
|
loginUGI.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws IOException {
|
||||||
|
jobHistoryService.refreshJobRetentionSettings();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
HSAuditLogger.logSuccess(user.getShortUserName(),
|
HSAuditLogger.logSuccess(user.getShortUserName(),
|
||||||
"refreshJobRetentionSettings", HISTORY_ADMIN_SERVER);
|
"refreshJobRetentionSettings", HISTORY_ADMIN_SERVER);
|
||||||
|
@ -21,6 +21,8 @@
|
|||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -45,6 +47,9 @@
|
|||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
import org.junit.runners.Parameterized.Parameters;
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.any;
|
||||||
|
import static org.mockito.Mockito.reset;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
@ -286,6 +291,56 @@ public void testRefreshJobRetentionSettings() throws Exception {
|
|||||||
verify(jobHistoryService).refreshJobRetentionSettings();
|
verify(jobHistoryService).refreshJobRetentionSettings();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void testUGIForLogAndJobRefresh() throws Exception {
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createUserForTesting("test", new String[] {"grp"});
|
||||||
|
UserGroupInformation loginUGI = spy(hsAdminServer.getLoginUGI());
|
||||||
|
hsAdminServer.setLoginUGI(loginUGI);
|
||||||
|
|
||||||
|
// Run refresh log retention settings with test user
|
||||||
|
ugi.doAs(new PrivilegedAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() {
|
||||||
|
String[] args = new String[1];
|
||||||
|
args[0] = "-refreshLogRetentionSettings";
|
||||||
|
try {
|
||||||
|
hsAdminClient.run(args);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("refreshLogRetentionSettings should have been successful");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Verify if AggregatedLogDeletionService#refreshLogRetentionSettings was
|
||||||
|
// called with login UGI, instead of the UGI command was run with.
|
||||||
|
verify(loginUGI).doAs(any(PrivilegedExceptionAction.class));
|
||||||
|
verify(alds).refreshLogRetentionSettings();
|
||||||
|
|
||||||
|
// Reset for refresh job retention settings
|
||||||
|
reset(loginUGI);
|
||||||
|
|
||||||
|
// Run refresh job retention settings with test user
|
||||||
|
ugi.doAs(new PrivilegedAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() {
|
||||||
|
String[] args = new String[1];
|
||||||
|
args[0] = "-refreshJobRetentionSettings";
|
||||||
|
try {
|
||||||
|
hsAdminClient.run(args);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("refreshJobRetentionSettings should have been successful");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Verify if JobHistory#refreshJobRetentionSettings was called with
|
||||||
|
// login UGI, instead of the UGI command was run with.
|
||||||
|
verify(loginUGI).doAs(any(PrivilegedExceptionAction.class));
|
||||||
|
verify(jobHistoryService).refreshJobRetentionSettings();
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void cleanUp() {
|
public void cleanUp() {
|
||||||
if (hsAdminServer != null)
|
if (hsAdminServer != null)
|
||||||
|
Loading…
Reference in New Issue
Block a user