MAPREDUCE-7309. Improve performance of reading resource request for mapper/reducers from config. Contributed by Peter Bacsko & Wangda Tan.

This commit is contained in:
Peter Bacsko 2020-11-25 11:36:58 +01:00
parent 9dd74141a6
commit 8ed565382f
2 changed files with 18 additions and 1 deletions

View File

@ -158,6 +158,9 @@ public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> { EventHandler<TaskAttemptEvent> {
@VisibleForTesting
protected final static Map<TaskType, Resource> RESOURCE_REQUEST_CACHE
= new HashMap<>();
static final Counters EMPTY_COUNTERS = new Counters(); static final Counters EMPTY_COUNTERS = new Counters();
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TaskAttemptImpl.class); LoggerFactory.getLogger(TaskAttemptImpl.class);
@ -172,7 +175,7 @@ public abstract class TaskAttemptImpl implements
private final Clock clock; private final Clock clock;
private final org.apache.hadoop.mapred.JobID oldJobId; private final org.apache.hadoop.mapred.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener; private final TaskAttemptListener taskAttemptListener;
private final Resource resourceCapability; private Resource resourceCapability;
protected Set<String> dataLocalHosts; protected Set<String> dataLocalHosts;
protected Set<String> dataLocalRacks; protected Set<String> dataLocalRacks;
private final List<String> diagnostics = new ArrayList<String>(); private final List<String> diagnostics = new ArrayList<String>();
@ -707,6 +710,10 @@ private void populateResourceCapability(TaskType taskType) {
getResourceTypePrefix(taskType); getResourceTypePrefix(taskType);
boolean memorySet = false; boolean memorySet = false;
boolean cpuVcoresSet = false; boolean cpuVcoresSet = false;
if (RESOURCE_REQUEST_CACHE.get(taskType) != null) {
resourceCapability = RESOURCE_REQUEST_CACHE.get(taskType);
return;
}
if (resourceTypePrefix != null) { if (resourceTypePrefix != null) {
List<ResourceInformation> resourceRequests = List<ResourceInformation> resourceRequests =
ResourceUtils.getRequestedResourcesFromConfig(conf, ResourceUtils.getRequestedResourcesFromConfig(conf,
@ -767,6 +774,9 @@ private void populateResourceCapability(TaskType taskType) {
if (!cpuVcoresSet) { if (!cpuVcoresSet) {
this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType)); this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
} }
RESOURCE_REQUEST_CACHE.put(taskType, resourceCapability);
LOG.info("Resource capability of task type {} is set to {}",
taskType, resourceCapability);
} }
private String getCpuVcoresKey(TaskType taskType) { private String getCpuVcoresKey(TaskType taskType) {

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider; import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -155,6 +156,11 @@ public static void setupBeforeClass() {
ResourceUtils.resetResourceTypes(new Configuration()); ResourceUtils.resetResourceTypes(new Configuration());
} }
@Before
public void before() {
TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear();
}
@After @After
public void tearDown() { public void tearDown() {
ResourceUtils.resetResourceTypes(new Configuration()); ResourceUtils.resetResourceTypes(new Configuration());
@ -1721,6 +1727,7 @@ public void testReducerMemoryRequestOverriding() {
TestAppender testAppender = new TestAppender(); TestAppender testAppender = new TestAppender();
final Logger logger = Logger.getLogger(TaskAttemptImpl.class); final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
try { try {
TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear();
logger.addAppender(testAppender); logger.addAppender(testAppender);
EventHandler eventHandler = mock(EventHandler.class); EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance(); Clock clock = SystemClock.getInstance();