MAPREDUCE-6363. [NNBench] Lease mismatch error when running with multiple mappers. Contributed by Vlad Sharanhovich and Bibin A Chundatt.

This commit is contained in:
Akira Ajisaka 2016-01-14 10:40:22 +09:00
parent 8315582c4f
commit 7b0964f354
2 changed files with 73 additions and 66 deletions

View File

@ -1009,6 +1009,9 @@ Release 2.6.4 - UNRELEASED
TaskAttemptImpl#sendJHStartEventForAssignedFailTask (Bibin A Chundatt via TaskAttemptImpl#sendJHStartEventForAssignedFailTask (Bibin A Chundatt via
jlowe) jlowe)
MAPREDUCE-6363. [NNBench] Lease mismatch error when running with multiple
mappers. (Vlad Sharanhovich and Bibin A Chundatt via aajisaka)
Release 2.6.3 - 2015-12-17 Release 2.6.3 - 2015-12-17
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,45 +18,42 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.IOException; import java.io.BufferedReader;
import java.util.Date;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.File;
import java.io.BufferedReader;
import java.util.StringTokenizer;
import java.net.InetAddress; import java.net.InetAddress;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
/** /**
* This program executes a specified operation that applies load to * This program executes a specified operation that applies load to
@ -149,7 +146,7 @@ private static void createControlFiles() throws IOException {
try { try {
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
LongWritable.class, CompressionType.NONE); LongWritable.class, CompressionType.NONE);
writer.append(new Text(strFileName), new LongWritable(0l)); writer.append(new Text(strFileName), new LongWritable(i));
} finally { } finally {
if (writer != null) { if (writer != null) {
writer.close(); writer.close();
@ -309,14 +306,7 @@ public static void parseInputs(final String[] args) {
*/ */
private static void analyzeResults() throws IOException { private static void analyzeResults() throws IOException {
final FileSystem fs = FileSystem.get(config); final FileSystem fs = FileSystem.get(config);
Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME), Path reduceDir = new Path(baseDir, OUTPUT_DIR_NAME);
"part-00000");
DataInputStream in;
in = new DataInputStream(fs.open(reduceFile));
BufferedReader lines;
lines = new BufferedReader(new InputStreamReader(in));
long totalTimeAL1 = 0l; long totalTimeAL1 = 0l;
long totalTimeAL2 = 0l; long totalTimeAL2 = 0l;
@ -327,32 +317,38 @@ private static void analyzeResults() throws IOException {
long mapStartTimeTPmS = 0l; long mapStartTimeTPmS = 0l;
long mapEndTimeTPmS = 0l; long mapEndTimeTPmS = 0l;
String resultTPSLine1 = null; FileStatus[] fss = fs.listStatus(reduceDir);
String resultTPSLine2 = null; for (FileStatus status : fss) {
String resultALLine1 = null;
String resultALLine2 = null; Path reduceFile = status.getPath();
DataInputStream in;
String line; in = new DataInputStream(fs.open(reduceFile));
while((line = lines.readLine()) != null) {
StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;"); BufferedReader lines;
String attr = tokens.nextToken(); lines = new BufferedReader(new InputStreamReader(in));
if (attr.endsWith(":totalTimeAL1")) {
totalTimeAL1 = Long.parseLong(tokens.nextToken()); String line;
} else if (attr.endsWith(":totalTimeAL2")) { while ((line = lines.readLine()) != null) {
totalTimeAL2 = Long.parseLong(tokens.nextToken()); StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;");
} else if (attr.endsWith(":totalTimeTPmS")) { String attr = tokens.nextToken();
totalTimeTPmS = Long.parseLong(tokens.nextToken()); if (attr.endsWith(":totalTimeAL1")) {
} else if (attr.endsWith(":latemaps")) { totalTimeAL1 = Long.parseLong(tokens.nextToken());
lateMaps = Long.parseLong(tokens.nextToken()); } else if (attr.endsWith(":totalTimeAL2")) {
} else if (attr.endsWith(":numOfExceptions")) { totalTimeAL2 = Long.parseLong(tokens.nextToken());
numOfExceptions = Long.parseLong(tokens.nextToken()); } else if (attr.endsWith(":totalTimeTPmS")) {
} else if (attr.endsWith(":successfulFileOps")) { totalTimeTPmS = Long.parseLong(tokens.nextToken());
successfulFileOps = Long.parseLong(tokens.nextToken()); } else if (attr.endsWith(":latemaps")) {
} else if (attr.endsWith(":mapStartTimeTPmS")) { lateMaps = Long.parseLong(tokens.nextToken());
mapStartTimeTPmS = Long.parseLong(tokens.nextToken()); } else if (attr.endsWith(":numOfExceptions")) {
} else if (attr.endsWith(":mapEndTimeTPmS")) { numOfExceptions = Long.parseLong(tokens.nextToken());
mapEndTimeTPmS = Long.parseLong(tokens.nextToken()); } else if (attr.endsWith(":successfulFileOps")) {
successfulFileOps = Long.parseLong(tokens.nextToken());
} else if (attr.endsWith(":mapStartTimeTPmS")) {
mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
} else if (attr.endsWith(":mapEndTimeTPmS")) {
mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
}
} }
} }
@ -377,6 +373,11 @@ private static void analyzeResults() throws IOException {
(double) successfulFileOps : (double) successfulFileOps :
(double) totalTimeTPmS / successfulFileOps; (double) totalTimeTPmS / successfulFileOps;
String resultTPSLine1 = null;
String resultTPSLine2 = null;
String resultALLine1 = null;
String resultALLine2 = null;
if (operation.equals(OP_CREATE_WRITE)) { if (operation.equals(OP_CREATE_WRITE)) {
// For create/write/close, it is treated as two transactions, // For create/write/close, it is treated as two transactions,
// since a file create from a client perspective involves create and close // since a file create from a client perspective involves create and close
@ -699,18 +700,21 @@ public void map(Text key,
successfulFileOps = 0l; successfulFileOps = 0l;
if (barrier()) { if (barrier()) {
String fileName = "file_" + value;
if (op.equals(OP_CREATE_WRITE)) { if (op.equals(OP_CREATE_WRITE)) {
startTimeTPmS = System.currentTimeMillis(); startTimeTPmS = System.currentTimeMillis();
doCreateWriteOp("file_" + hostName + "_", reporter); doCreateWriteOp(fileName, reporter);
} else if (op.equals(OP_OPEN_READ)) { } else if (op.equals(OP_OPEN_READ)) {
startTimeTPmS = System.currentTimeMillis(); startTimeTPmS = System.currentTimeMillis();
doOpenReadOp("file_" + hostName + "_", reporter); doOpenReadOp(fileName, reporter);
} else if (op.equals(OP_RENAME)) { } else if (op.equals(OP_RENAME)) {
startTimeTPmS = System.currentTimeMillis(); startTimeTPmS = System.currentTimeMillis();
doRenameOp("file_" + hostName + "_", reporter); doRenameOp(fileName, reporter);
} else if (op.equals(OP_DELETE)) { } else if (op.equals(OP_DELETE)) {
startTimeTPmS = System.currentTimeMillis(); startTimeTPmS = System.currentTimeMillis();
doDeleteOp("file_" + hostName + "_", reporter); } else {
throw new IllegalArgumentException(
"unsupported operation [" + op + "]");
} }
endTimeTPms = System.currentTimeMillis(); endTimeTPms = System.currentTimeMillis();
@ -777,9 +781,8 @@ private void doCreateWriteOp(String name,
reporter.setStatus("Finish "+ l + " files"); reporter.setStatus("Finish "+ l + " files");
} catch (IOException e) { } catch (IOException e) {
LOG.info("Exception recorded in op: " + LOG.error("Exception recorded in op: Create/Write/Close, "
"Create/Write/Close"); + "file: \"" + filePath + "\"", e);
numOfExceptions++; numOfExceptions++;
} }
} }
@ -822,7 +825,8 @@ private void doOpenReadOp(String name,
reporter.setStatus("Finish "+ l + " files"); reporter.setStatus("Finish "+ l + " files");
} catch (IOException e) { } catch (IOException e) {
LOG.info("Exception recorded in op: OpenRead " + e); LOG.error("Exception recorded in op: OpenRead, " + "file: \""
+ filePath + "\"", e);
numOfExceptions++; numOfExceptions++;
} }
} }
@ -856,8 +860,8 @@ private void doRenameOp(String name,
reporter.setStatus("Finish "+ l + " files"); reporter.setStatus("Finish "+ l + " files");
} catch (IOException e) { } catch (IOException e) {
LOG.info("Exception recorded in op: Rename"); LOG.error("Exception recorded in op: Rename, " + "file: \""
+ filePath + "\"", e);
numOfExceptions++; numOfExceptions++;
} }
} }
@ -889,8 +893,8 @@ private void doDeleteOp(String name,
reporter.setStatus("Finish "+ l + " files"); reporter.setStatus("Finish "+ l + " files");
} catch (IOException e) { } catch (IOException e) {
LOG.info("Exception in recorded op: Delete"); LOG.error("Exception recorded in op: Delete, " + "file: \""
+ filePath + "\"", e);
numOfExceptions++; numOfExceptions++;
} }
} }