MAPREDUCE-6197. Cache MapOutputLocations in ShuffleHandler. Contributed by Junping Du

This commit is contained in:
Jian He 2016-06-21 14:25:58 -07:00
parent b2c596cdda
commit d8107fcd1c
2 changed files with 168 additions and 47 deletions

View File

@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -131,6 +132,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.cache.Weigher;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@ -156,6 +163,9 @@ public class ShuffleHandler extends AuxiliaryService {
protected static final Version CURRENT_VERSION_INFO =
Version.newInstance(1, 0);
private static final String DATA_FILE_NAME = "file.out";
private static final String INDEX_FILE_NAME = "file.out.index";
private int port;
private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup();
@ -294,12 +304,12 @@ private static class ReduceContext {
private ChannelHandlerContext ctx;
private String user;
private Map<String, Shuffle.MapOutputInfo> infoMap;
private String outputBasePathStr;
private String jobId;
public ReduceContext(List<String> mapIds, int rId,
ChannelHandlerContext context, String usr,
Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
String outputBasePath) {
String jobId) {
this.mapIds = mapIds;
this.reduceId = rId;
@ -319,7 +329,7 @@ public ReduceContext(List<String> mapIds, int rId,
this.ctx = context;
this.user = usr;
this.infoMap = mapOutputInfoMap;
this.outputBasePathStr = outputBasePath;
this.jobId = jobId;
}
public int getReduceId() {
@ -338,8 +348,8 @@ public Map<String, Shuffle.MapOutputInfo> getInfoMap() {
return infoMap;
}
public String getOutputBasePathStr() {
return outputBasePathStr;
public String getJobId() {
return jobId;
}
public List<String> getMapIds() {
@ -780,18 +790,63 @@ public ChannelPipeline getPipeline() throws Exception {
class Shuffle extends SimpleChannelUpstreamHandler {
private static final int MAX_WEIGHT = 10 * 1024 * 1024;
private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
private static final int ALLOWED_CONCURRENCY = 16;
private final Configuration conf;
private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
private int port;
private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache =
CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES,
TimeUnit.MINUTES).softValues().concurrencyLevel(ALLOWED_CONCURRENCY).
removalListener(
new RemovalListener<AttemptPathIdentifier, AttemptPathInfo>() {
@Override
public void onRemoval(RemovalNotification<AttemptPathIdentifier,
AttemptPathInfo> notification) {
if (LOG.isDebugEnabled()) {
LOG.debug("PathCache Eviction: " + notification.getKey() +
", Reason=" + notification.getCause());
}
}
}
).maximumWeight(MAX_WEIGHT).weigher(
new Weigher<AttemptPathIdentifier, AttemptPathInfo>() {
@Override
public int weigh(AttemptPathIdentifier key,
AttemptPathInfo value) {
return key.jobId.length() + key.user.length() +
key.attemptId.length()+
value.indexPath.toString().length() +
value.dataPath.toString().length();
}
}
).build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() {
@Override
public AttemptPathInfo load(AttemptPathIdentifier key) throws
Exception {
String base = getBaseLocation(key.jobId, key.user);
String attemptBase = base + key.attemptId;
Path indexFileName = lDirAlloc.getLocalPathToRead(
attemptBase + "/" + INDEX_FILE_NAME, conf);
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
attemptBase + "/" + DATA_FILE_NAME, conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded : " + key + " via loader");
}
return new AttemptPathInfo(indexFileName, mapOutputFileName);
}
});
public Shuffle(Configuration conf) {
this.conf = conf;
indexCache = new IndexCache(new JobConf(conf));
this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
}
public void setPort(int port) {
this.port = port;
}
@ -908,13 +963,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
Channel ch = evt.getChannel();
String user = userRsrc.get(jobId);
// $x/$user/appcache/$appId/output/$mapId
// TODO: Once Shuffle is out of NM, this can use MR APIs to convert
// between App and Job
String outputBasePathStr = getBaseLocation(jobId, user);
try {
populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
populateHeaders(mapIds, jobId, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
ch.write(response);
@ -926,7 +976,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
ch.write(response);
//Initialize one ReduceContext object per messageReceived call
ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
user, mapOutputInfoMap, outputBasePathStr);
user, mapOutputInfoMap, jobId);
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
@ -957,9 +1007,8 @@ public ChannelFuture sendMap(ReduceContext reduceContext)
try {
MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
if (info == null) {
info = getMapOutputInfo(reduceContext.getOutputBasePathStr() +
mapId, mapId, reduceContext.getReduceId(),
reduceContext.getUser());
info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
reduceContext.getJobId(), reduceContext.getUser());
}
nextMap = sendMapOutput(
reduceContext.getCtx(),
@ -1003,46 +1052,58 @@ private String getBaseLocation(String jobId, String user) {
return baseStr;
}
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
// Index file
Path indexFileName =
lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
IndexRecord info =
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
Path mapOutputFileName =
lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
if (LOG.isDebugEnabled()) {
LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
String jobId, String user) throws IOException {
AttemptPathInfo pathInfo;
try {
AttemptPathIdentifier identifier = new AttemptPathIdentifier(
jobId, user, mapId);
pathInfo = pathCache.get(identifier);
if (LOG.isDebugEnabled()) {
LOG.debug("Retrieved pathInfo for " + identifier +
" check for corresponding loaded messages to determine whether" +
" it was loaded or cached");
}
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
IndexRecord info =
indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
if (LOG.isDebugEnabled()) {
LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
",dataFile=" + pathInfo.dataPath + ", indexFile=" +
pathInfo.indexPath);
}
MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info);
return outputInfo;
}
protected void populateHeaders(List<String> mapIds, String outputBaseStr,
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request, HttpResponse response,
boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
throws IOException {
long contentLength = 0;
for (String mapId : mapIds) {
String base = outputBaseStr + mapId;
MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
mapOutputInfoMap.put(mapId, outputInfo);
}
// Index file
Path indexFileName =
lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
IndexRecord info =
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
ShuffleHeader header =
new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
new ShuffleHeader(mapId, outputInfo.indexRecord.partLength,
outputInfo.indexRecord.rawLength, reduce);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
contentLength += info.partLength;
contentLength += outputInfo.indexRecord.partLength;
contentLength += dob.getLength();
}
@ -1215,4 +1276,64 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
}
}
}
static class AttemptPathInfo {
// TODO Change this over to just store local dir indices, instead of the
// entire path. Far more efficient.
private final Path indexPath;
private final Path dataPath;
public AttemptPathInfo(Path indexPath, Path dataPath) {
this.indexPath = indexPath;
this.dataPath = dataPath;
}
}
static class AttemptPathIdentifier {
private final String jobId;
private final String user;
private final String attemptId;
public AttemptPathIdentifier(String jobId, String user, String attemptId) {
this.jobId = jobId;
this.user = user;
this.attemptId = attemptId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AttemptPathIdentifier that = (AttemptPathIdentifier) o;
if (!attemptId.equals(that.attemptId)) {
return false;
}
if (!jobId.equals(that.jobId)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = jobId.hashCode();
result = 31 * result + attemptId.hashCode();
return result;
}
@Override
public String toString() {
return "AttemptPathIdentifier{" +
"attemptId='" + attemptId + '\'' +
", jobId='" + jobId + '\'' +
'}';
}
}
}

View File

@ -110,8 +110,8 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
throws IOException {
}
@Override
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
String jobId, String user) throws IOException {
// Do nothing.
return null;
}
@ -230,8 +230,8 @@ protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
String jobId, String user) throws IOException {
return null;
}
@Override
@ -325,8 +325,8 @@ protected Shuffle getShuffle(final Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
String jobId, String user) throws IOException {
return null;
}
@Override
@ -534,8 +534,8 @@ protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
String jobId, String user) throws IOException {
// Do nothing.
return null;
}