Addendum to MAPREDUCE-6451

This commit is contained in:
Kihwal Lee 2015-10-30 16:05:23 -05:00
parent 2868ca0328
commit b24fe06483

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred.lib;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.tools.DistCpConstants;
import java.io.IOException;
/**
* Class to initialize the DynamicInputChunk invariants.
*/
class DynamicInputChunkContext<K, V> {
private static Log LOG = LogFactory.getLog(DynamicInputChunkContext.class);
private Configuration configuration;
private Path chunkRootPath = null;
private String chunkFilePrefix;
private FileSystem fs;
private int numChunksLeft = -1; // Un-initialized before 1st dir-scan.
public DynamicInputChunkContext(Configuration config)
throws IOException {
this.configuration = config;
Path listingFilePath = new Path(getListingFilePath(configuration));
chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
fs = chunkRootPath.getFileSystem(configuration);
chunkFilePrefix = listingFilePath.getName() + ".chunk.";
}
public Configuration getConfiguration() {
return configuration;
}
public Path getChunkRootPath() {
return chunkRootPath;
}
public String getChunkFilePrefix() {
return chunkFilePrefix;
}
public FileSystem getFs() {
return fs;
}
private static String getListingFilePath(Configuration configuration) {
final String listingFileString = configuration.get(
DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
assert !listingFileString.equals("") : "Listing file not found.";
return listingFileString;
}
public int getNumChunksLeft() {
return numChunksLeft;
}
public DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
String taskId
= taskAttemptContext.getTaskAttemptID().getTaskID().toString();
Path acquiredFilePath = new Path(getChunkRootPath(), taskId);
if (fs.exists(acquiredFilePath)) {
LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath);
return new DynamicInputChunk(acquiredFilePath, taskAttemptContext, this);
}
for (FileStatus chunkFile : getListOfChunkFiles()) {
if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
LOG.info(taskId + " acquired " + chunkFile.getPath());
return new DynamicInputChunk(acquiredFilePath, taskAttemptContext,
this);
}
}
return null;
}
public DynamicInputChunk createChunkForWrite(String chunkId)
throws IOException {
return new DynamicInputChunk(chunkId, this);
}
public FileStatus [] getListOfChunkFiles() throws IOException {
Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern);
numChunksLeft = chunkFiles.length;
return chunkFiles;
}
}