MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte characters in the job name. Contributed by Kousuke Saruta.

This commit is contained in:
Akira Ajisaka 2016-01-29 16:19:28 +09:00
parent 8ee060311c
commit df99ea8a92
3 changed files with 296 additions and 77 deletions

View File

@ -709,6 +709,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6563. Streaming documentation contains a stray '%' character.
(cnauroth)
MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte
characters in the job name. (Kousuke Saruta via aajisaka)
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -22,6 +22,7 @@
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,7 +36,7 @@ public class FileNameIndexUtils {
// Sanitize job history file for predictable parsing
static final String DELIMITER = "-";
static final String DELIMITER_ESCAPE = "%2D";
private static final Log LOG = LogFactory.getLog(FileNameIndexUtils.class);
// Job history file names need to be backwards compatible
@ -57,7 +58,8 @@ public class FileNameIndexUtils {
* @param indexInfo the index info.
* @return the done job history filename.
*/
public static String getDoneFileName(JobIndexInfo indexInfo) throws IOException {
public static String getDoneFileName(JobIndexInfo indexInfo)
throws IOException {
return getDoneFileName(indexInfo,
JHAdminConfig.DEFAULT_MR_HS_JOBNAME_LIMIT);
}
@ -66,49 +68,58 @@ public static String getDoneFileName(JobIndexInfo indexInfo,
int jobNameLimit) throws IOException {
StringBuilder sb = new StringBuilder();
//JobId
sb.append(escapeDelimiters(TypeConverter.fromYarn(indexInfo.getJobId()).toString()));
sb.append(encodeJobHistoryFileName(escapeDelimiters(
TypeConverter.fromYarn(indexInfo.getJobId()).toString())));
sb.append(DELIMITER);
//SubmitTime
sb.append(indexInfo.getSubmitTime());
sb.append(encodeJobHistoryFileName(String.valueOf(
indexInfo.getSubmitTime())));
sb.append(DELIMITER);
//UserName
sb.append(escapeDelimiters(getUserName(indexInfo)));
sb.append(encodeJobHistoryFileName(escapeDelimiters(
getUserName(indexInfo))));
sb.append(DELIMITER);
//JobName
sb.append(escapeDelimiters(trimJobName(
getJobName(indexInfo), jobNameLimit)));
sb.append(trimURLEncodedString(encodeJobHistoryFileName(escapeDelimiters(
getJobName(indexInfo))), jobNameLimit));
sb.append(DELIMITER);
//FinishTime
sb.append(indexInfo.getFinishTime());
sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getFinishTime())));
sb.append(DELIMITER);
//NumMaps
sb.append(indexInfo.getNumMaps());
sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getNumMaps())));
sb.append(DELIMITER);
//NumReduces
sb.append(indexInfo.getNumReduces());
sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getNumReduces())));
sb.append(DELIMITER);
//JobStatus
sb.append(indexInfo.getJobStatus());
sb.append(encodeJobHistoryFileName(indexInfo.getJobStatus()));
sb.append(DELIMITER);
//QueueName
sb.append(escapeDelimiters(getQueueName(indexInfo)));
sb.append(escapeDelimiters(encodeJobHistoryFileName(
getQueueName(indexInfo))));
sb.append(DELIMITER);
//JobStartTime
sb.append(indexInfo.getJobStartTime());
sb.append(encodeJobHistoryFileName(
String.valueOf(indexInfo.getJobStartTime())));
sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION);
return encodeJobHistoryFileName(sb.toString());
sb.append(encodeJobHistoryFileName(
JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
return sb.toString();
}
/**
* Parses the provided job history file name to construct a
* JobIndexInfo object which is returned.
@ -116,21 +127,24 @@ public static String getDoneFileName(JobIndexInfo indexInfo,
* @param jhFileName the job history filename.
* @return a JobIndexInfo object built from the filename.
*/
public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
String fileName = jhFileName.substring(0, jhFileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
public static JobIndexInfo getIndexInfo(String jhFileName)
throws IOException {
String fileName = jhFileName.substring(0,
jhFileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
JobIndexInfo indexInfo = new JobIndexInfo();
String[] jobDetails = fileName.split(DELIMITER);
JobID oldJobId = JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
JobID oldJobId =
JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
JobId jobId = TypeConverter.toYarn(oldJobId);
indexInfo.setJobId(jobId);
// Do not fail if there are some minor parse errors
try {
try {
indexInfo.setSubmitTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
indexInfo.setSubmitTime(Long.parseLong(
decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse submit time from job history file "
+ jhFileName + " : " + e);
@ -143,24 +157,24 @@ public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
try {
indexInfo.setFinishTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
indexInfo.setFinishTime(Long.parseLong(
decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse finish time from job history file "
+ jhFileName + " : " + e);
}
try {
indexInfo.setNumMaps(
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
indexInfo.setNumMaps(Integer.parseInt(
decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse num maps from job history file "
+ jhFileName + " : " + e);
}
try {
indexInfo.setNumReduces(
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
indexInfo.setNumReduces(Integer.parseInt(
decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse num reduces from job history file "
+ jhFileName + " : " + e);
@ -176,8 +190,8 @@ public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
if (jobDetails.length <= JOB_START_TIME_INDEX) {
indexInfo.setJobStartTime(indexInfo.getSubmitTime());
} else {
indexInfo.setJobStartTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
indexInfo.setJobStartTime(Long.parseLong(
decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
}
} catch (NumberFormatException e){
LOG.warn("Unable to parse start time from job history file "
@ -187,13 +201,13 @@ public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
LOG.warn("Parsing job history file with partial data encoded into name: "
+ jhFileName);
}
return indexInfo;
}
/**
* Helper function to encode the URL of the filename of the job-history
* Helper function to encode the URL of the filename of the job-history
* log file.
*
* @param logFileName file name of the job-history file
@ -208,7 +222,8 @@ public static String encodeJobHistoryFileName(String logFileName)
if (logFileName.contains(DELIMITER_ESCAPE)) {
replacementDelimiterEscape = nonOccursString(logFileName);
logFileName = logFileName.replaceAll(DELIMITER_ESCAPE, replacementDelimiterEscape);
logFileName = logFileName.replaceAll(
DELIMITER_ESCAPE, replacementDelimiterEscape);
}
String encodedFileName = null;
@ -223,14 +238,15 @@ public static String encodeJobHistoryFileName(String logFileName)
// Restore protected escape delimiters after encoding
if (replacementDelimiterEscape != null) {
encodedFileName = encodedFileName.replaceAll(replacementDelimiterEscape, DELIMITER_ESCAPE);
encodedFileName = encodedFileName.replaceAll(
replacementDelimiterEscape, DELIMITER_ESCAPE);
}
return encodedFileName;
}
/**
* Helper function to decode the URL of the filename of the job-history
* Helper function to decode the URL of the filename of the job-history
* log file.
*
* @param logFileName file name of the job-history file
@ -250,7 +266,7 @@ public static String decodeJobHistoryFileName(String logFileName)
}
return decodedFileName;
}
static String nonOccursString(String logFileName) {
int adHocIndex = 0;
@ -262,11 +278,11 @@ static String nonOccursString(String logFileName) {
return unfoundString + "q";
}
private static String getUserName(JobIndexInfo indexInfo) {
return getNonEmptyString(indexInfo.getUser());
}
private static String getJobName(JobIndexInfo indexInfo) {
return getNonEmptyString(indexInfo.getJobName());
}
@ -283,18 +299,65 @@ private static String getNonEmptyString(String in) {
}
return in;
}
private static String escapeDelimiters(String escapee) {
return escapee.replaceAll(DELIMITER, DELIMITER_ESCAPE);
}
/**
* Trims the job-name if required
* Trims the url-encoded string if required
*/
private static String trimJobName(String jobName, int jobNameLimit) {
if (jobName.length() > jobNameLimit) {
jobName = jobName.substring(0, jobNameLimit);
private static String trimURLEncodedString(
String encodedString, int limitLength) {
assert(limitLength >= 0) : "limitLength should be positive integer";
if (encodedString.length() < limitLength) {
return encodedString;
}
return jobName;
int index = 0;
int increase = 0;
byte[] strBytes = encodedString.getBytes(UTF_8);
// calculate effective character length based on UTF-8 specification.
// The size of a character coded in UTF-8 should be 4-byte at most.
// See RFC3629
while (true) {
byte b = strBytes[index];
if (b == '%') {
byte minuend1 = strBytes[index + 1];
byte subtrahend1 = (byte)(Character.isDigit(
minuend1) ? '0' : 'A' - 10);
byte minuend2 = strBytes[index + 2];
byte subtrahend2 = (byte)(Character.isDigit(
minuend2) ? '0' : 'A' - 10);
int initialHex =
((Character.toUpperCase(minuend1) - subtrahend1) << 4) +
(Character.toUpperCase(minuend2) - subtrahend2);
if (0x00 <= initialHex && initialHex <= 0x7F) {
// For 1-byte UTF-8 characters
increase = 3;
} else if (0xC2 <= initialHex && initialHex <= 0xDF) {
// For 2-byte UTF-8 characters
increase = 6;
} else if (0xE0 <= initialHex && initialHex <= 0xEF) {
// For 3-byte UTF-8 characters
increase = 9;
} else {
// For 4-byte UTF-8 characters
increase = 12;
}
} else {
increase = 1;
}
if (index + increase > limitLength) {
break;
} else {
index += increase;
}
}
return encodedString.substring(0, index);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.jobhistory;
import java.io.IOException;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter;
@ -30,14 +31,14 @@
public class TestFileNameIndexUtils {
private static final String OLD_JOB_HISTORY_FILE_FORMATTER = "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
private static final String OLD_FORMAT_BEFORE_ADD_START_TIME = "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
@ -51,29 +52,29 @@ public class TestFileNameIndexUtils {
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
private static final String JOB_HISTORY_FILE_FORMATTER = "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
private static final String JOB_ID = "job_1317928501754_0001";
private static final String SUBMIT_TIME = "1317928742025";
private static final String USER_NAME = "username";
private static final String USER_NAME_WITH_DELIMITER = "user"
+ FileNameIndexUtils.DELIMITER + "name";
+ FileNameIndexUtils.DELIMITER + "name";
private static final String USER_NAME_WITH_DELIMITER_ESCAPE = "user"
+ FileNameIndexUtils.DELIMITER_ESCAPE + "name";
+ FileNameIndexUtils.DELIMITER_ESCAPE + "name";
private static final String JOB_NAME = "mapreduce";
private static final String JOB_NAME_WITH_DELIMITER = "map"
+ FileNameIndexUtils.DELIMITER + "reduce";
+ FileNameIndexUtils.DELIMITER + "reduce";
private static final String JOB_NAME_WITH_DELIMITER_ESCAPE = "map"
+ FileNameIndexUtils.DELIMITER_ESCAPE + "reduce";
+ FileNameIndexUtils.DELIMITER_ESCAPE + "reduce";
private static final String FINISH_TIME = "1317928754958";
private static final String NUM_MAPS = "1";
private static final String NUM_REDUCES = "1";
@ -123,7 +124,7 @@ public void testEncodingDecodingEquivalence() throws IOException {
Assert.assertEquals("Queue name different after encoding and decoding",
info.getQueueName(), parsedInfo.getQueueName());
Assert.assertEquals("Job start time different after encoding and decoding",
info.getJobStartTime(), parsedInfo.getJobStartTime());
info.getJobStartTime(), parsedInfo.getJobStartTime());
}
@Test
@ -173,6 +174,158 @@ public void testTrimJobName() throws IOException {
parsedInfo.getJobName());
}
/**
* Verify the name of jobhistory file is not greater than 255 bytes
* even if there are some multibyte characters in the job name.
*/
@Test
public void testJobNameWithMultibyteChars() throws IOException {
JobIndexInfo info = new JobIndexInfo();
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
info.setJobId(jobId);
info.setSubmitTime(Long.parseLong(SUBMIT_TIME));
info.setUser(USER_NAME);
StringBuilder sb = new StringBuilder();
info.setFinishTime(Long.parseLong(FINISH_TIME));
info.setNumMaps(Integer.parseInt(NUM_MAPS));
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME);
info.setJobStartTime(Long.parseLong(JOB_START_TIME));
// Test for 1 byte UTF-8 character
// which is encoded into 1 x 3 = 3 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('%');
}
String longJobName = sb.toString();
info.setJobName(longJobName);
String jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 50);
Assert.assertTrue(jobHistoryFile.length() <= 255);
String trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 3 x 16 < 50 < 3 x 17 so the length of trimedJobName should be 48
Assert.assertEquals(48, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
byte[] trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
String reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 2 bytes UTF-8 character
// which is encoded into 2 x 3 = 6 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('\u03A9'); // large omega
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 27);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 6 x 4 < 27 < 6 x 5 so the length of trimedJobName should be 24
Assert.assertEquals(24, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 3 bytes UTF-8 character
// which is encoded into 3 x 3 = 9 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append('\u2192'); // rightwards arrow
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 40);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 9 x 4 < 40 < 9 x 5 so the length of trimedJobName should be 36
Assert.assertEquals(36, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for 4 bytes UTF-8 character
// which is encoded into 4 x 3 = 12 characters by URL encode.
for (int i = 0; i < 100; i++) {
sb.append("\uD867\uDE3D"); // Mugil cephalus in Kanji.
}
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 49);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// 12 x 4 < 49 < 12 x 5 so the length of trimedJobName should be 48
Assert.assertEquals(48, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
sb.setLength(0);
// Test for the combination of 1 to 4 bytes UTF-8 characters
sb.append('\u732B') // cat in Kanji (encoded into 3 bytes x 3 characters)
.append("[") // (encoded into 1 byte x 3 characters)
.append('\u03BB') // small lambda (encoded into 2 bytes x 3 characters)
.append('/') // (encoded into 1 byte x 3 characters)
.append('A') // not url-encoded (1 byte x 1 character)
.append("\ud867\ude49") // flying fish in
// Kanji (encoded into 4 bytes x 3 characters)
.append('\u72AC'); // dog in Kanji (encoded into 3 bytes x 3 characters)
longJobName = sb.toString();
info.setJobName(longJobName);
jobHistoryFile =
FileNameIndexUtils.getDoneFileName(info, 23);
Assert.assertTrue(jobHistoryFile.length() <= 255);
trimedJobName = jobHistoryFile.split(
FileNameIndexUtils.DELIMITER)[3]; // 3 is index of job name
// total size of the first 5 characters = 22
// 23 < total size of the first 6 characters
Assert.assertEquals(22, trimedJobName.getBytes(UTF_8).length);
// validate whether trimmedJobName by testing reversibility
trimedJobNameInByte = trimedJobName.getBytes(UTF_8);
reEncodedTrimedJobName = new String(trimedJobNameInByte, UTF_8);
Assert.assertArrayEquals(trimedJobNameInByte,
reEncodedTrimedJobName.getBytes(UTF_8));
}
@Test
public void testUserNamePercentDecoding() throws IOException {
String jobHistoryFile = String.format(JOB_HISTORY_FILE_FORMATTER,