Merging r1547474 through r1547657 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1547658 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-12-04 00:45:38 +00:00
commit b7ce46887b
21 changed files with 372 additions and 155 deletions

View File

@ -390,6 +390,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10126. LightWeightGSet log message is confusing. (Vinay via suresh)
HADOOP-10127. Add ipc.client.connect.retry.interval to control the frequency
of connection retries (Karthik Kambatla via Sandy Ryza)
OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)

View File

@ -192,6 +192,11 @@ public class CommonConfigurationKeysPublic {
/** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_KEY */
public static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY =
"ipc.client.connect.retry.interval";
/** Default value for IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY */
public static final int IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT = 1000;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY =
"ipc.client.connect.max.retries.on.timeouts";
/** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY */

View File

@ -1562,8 +1562,13 @@ static ConnectionId getConnectionId(InetSocketAddress addr,
final int max = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
final int retryInterval = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT);
connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
max, 1, TimeUnit.SECONDS);
max, retryInterval, TimeUnit.MILLISECONDS);
}
boolean doPing =

View File

@ -618,6 +618,14 @@
</description>
</property>
<property>
<name>ipc.client.connect.retry.interval</name>
<value>1000</value>
<description>Indicates the number of milliseconds a client will wait for
before retrying to establish a server connection.
</description>
</property>
<property>
<name>ipc.client.connect.timeout</name>
<value>20000</value>

View File

@ -767,6 +767,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5563. NFS gateway should commit the buffered data when read request comes
after write to the same file (brandonli)
HDFS-4997. libhdfs doesn't return correct error codes in most cases (cmccabe)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -39,14 +39,6 @@ struct jsonException {
const char *message;
};
static void dotsToSlashes(char *str)
{
for (; *str != '\0'; str++) {
if (*str == '.')
*str = '/';
}
}
/** Print out the JSON exception information */
static int printJsonExceptionV(struct jsonException *exc, int noPrintFlags,
const char *fmt, va_list ap)
@ -62,7 +54,6 @@ static int printJsonExceptionV(struct jsonException *exc, int noPrintFlags,
fprintf(stderr, "printJsonExceptionV: internal out of memory error\n");
return EINTERNAL;
}
dotsToSlashes(javaClassName);
getExceptionInfo(javaClassName, noPrintFlags, &excErrno, &shouldPrint);
free(javaClassName);

View File

@ -35,36 +35,55 @@ struct ExceptionInfo {
static const struct ExceptionInfo gExceptionInfo[] = {
{
.name = "java/io/FileNotFoundException",
.name = "java.io.FileNotFoundException",
.noPrintFlag = NOPRINT_EXC_FILE_NOT_FOUND,
.excErrno = ENOENT,
},
{
.name = "org/apache/hadoop/security/AccessControlException",
.name = "org.apache.hadoop.security.AccessControlException",
.noPrintFlag = NOPRINT_EXC_ACCESS_CONTROL,
.excErrno = EACCES,
},
{
.name = "org/apache/hadoop/fs/UnresolvedLinkException",
.name = "org.apache.hadoop.fs.UnresolvedLinkException",
.noPrintFlag = NOPRINT_EXC_UNRESOLVED_LINK,
.excErrno = ENOLINK,
},
{
.name = "org/apache/hadoop/fs/ParentNotDirectoryException",
.name = "org.apache.hadoop.fs.ParentNotDirectoryException",
.noPrintFlag = NOPRINT_EXC_PARENT_NOT_DIRECTORY,
.excErrno = ENOTDIR,
},
{
.name = "java/lang/IllegalArgumentException",
.name = "java.lang.IllegalArgumentException",
.noPrintFlag = NOPRINT_EXC_ILLEGAL_ARGUMENT,
.excErrno = EINVAL,
},
{
.name = "java/lang/OutOfMemoryError",
.name = "java.lang.OutOfMemoryError",
.noPrintFlag = 0,
.excErrno = ENOMEM,
},
{
.name = "org.apache.hadoop.hdfs.server.namenode.SafeModeException",
.noPrintFlag = 0,
.excErrno = EROFS,
},
{
.name = "org.apache.hadoop.fs.FileAlreadyExistsException",
.noPrintFlag = 0,
.excErrno = EEXIST,
},
{
.name = "org.apache.hadoop.hdfs.protocol.QuotaExceededException",
.noPrintFlag = 0,
.excErrno = EDQUOT,
},
{
.name = "org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException",
.noPrintFlag = 0,
.excErrno = ESTALE,
},
};
void getExceptionInfo(const char *excName, int noPrintFlags,

View File

@ -48,7 +48,8 @@ struct tlhThreadInfo {
pthread_t thread;
};
static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs)
static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs,
const char *username)
{
int ret, port;
hdfsFS hdfs;
@ -70,6 +71,9 @@ static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs
TO_STR(TLH_DEFAULT_BLOCK_SIZE));
hdfsBuilderConfSetStr(bld, "dfs.blocksize",
TO_STR(TLH_DEFAULT_BLOCK_SIZE));
if (username) {
hdfsBuilderSetUserName(bld, username);
}
hdfs = hdfsBuilderConnect(bld);
if (!hdfs) {
ret = -errno;
@ -110,36 +114,58 @@ static int doTestGetDefaultBlockSize(hdfsFS fs, const char *path)
return 0;
}
static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
struct tlhPaths {
char prefix[256];
char file1[256];
char file2[256];
};
static int setupPaths(const struct tlhThreadInfo *ti, struct tlhPaths *paths)
{
char prefix[256], tmp[256];
memset(paths, sizeof(*paths), 0);
if (snprintf(paths->prefix, sizeof(paths->prefix), "/tlhData%04d",
ti->threadIdx) >= sizeof(paths->prefix)) {
return ENAMETOOLONG;
}
if (snprintf(paths->file1, sizeof(paths->file1), "%s/file1",
paths->prefix) >= sizeof(paths->file1)) {
return ENAMETOOLONG;
}
if (snprintf(paths->file2, sizeof(paths->file2), "%s/file2",
paths->prefix) >= sizeof(paths->file2)) {
return ENAMETOOLONG;
}
return 0;
}
static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
const struct tlhPaths *paths)
{
char tmp[4096];
hdfsFile file;
int ret, expected;
hdfsFileInfo *fileInfo;
struct hdfsReadStatistics *readStats = NULL;
snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx);
if (hdfsExists(fs, prefix) == 0) {
EXPECT_ZERO(hdfsDelete(fs, prefix, 1));
if (hdfsExists(fs, paths->prefix) == 0) {
EXPECT_ZERO(hdfsDelete(fs, paths->prefix, 1));
}
EXPECT_ZERO(hdfsCreateDirectory(fs, prefix));
snprintf(tmp, sizeof(tmp), "%s/file", prefix);
EXPECT_ZERO(hdfsCreateDirectory(fs, paths->prefix));
EXPECT_ZERO(doTestGetDefaultBlockSize(fs, prefix));
EXPECT_ZERO(doTestGetDefaultBlockSize(fs, paths->prefix));
/* There should not be any file to open for reading. */
EXPECT_NULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0));
EXPECT_NULL(hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0));
/* hdfsOpenFile should not accept mode = 3 */
EXPECT_NULL(hdfsOpenFile(fs, tmp, 3, 0, 0, 0));
EXPECT_NULL(hdfsOpenFile(fs, paths->file1, 3, 0, 0, 0));
file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0);
file = hdfsOpenFile(fs, paths->file1, O_WRONLY, 0, 0, 0);
EXPECT_NONNULL(file);
/* TODO: implement writeFully and use it here */
expected = strlen(prefix);
ret = hdfsWrite(fs, file, prefix, expected);
expected = strlen(paths->prefix);
ret = hdfsWrite(fs, file, paths->prefix, expected);
if (ret < 0) {
ret = errno;
fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret);
@ -155,7 +181,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
EXPECT_ZERO(hdfsCloseFile(fs, file));
/* Let's re-open the file for reading */
file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0);
file = hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0);
EXPECT_NONNULL(file);
EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
@ -180,60 +206,67 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
errno = 0;
EXPECT_INT_EQ(expected, readStats->totalBytesRead);
hdfsFileFreeReadStatistics(readStats);
EXPECT_ZERO(memcmp(prefix, tmp, expected));
EXPECT_ZERO(memcmp(paths->prefix, tmp, expected));
EXPECT_ZERO(hdfsCloseFile(fs, file));
// TODO: Non-recursive delete should fail?
//EXPECT_NONZERO(hdfsDelete(fs, prefix, 0));
EXPECT_ZERO(hdfsCopy(fs, paths->file1, fs, paths->file2));
snprintf(tmp, sizeof(tmp), "%s/file", prefix);
EXPECT_ZERO(hdfsChown(fs, tmp, NULL, NULL));
EXPECT_ZERO(hdfsChown(fs, tmp, NULL, "doop"));
fileInfo = hdfsGetPathInfo(fs, tmp);
EXPECT_ZERO(hdfsChown(fs, paths->file2, NULL, NULL));
EXPECT_ZERO(hdfsChown(fs, paths->file2, NULL, "doop"));
fileInfo = hdfsGetPathInfo(fs, paths->file2);
EXPECT_NONNULL(fileInfo);
EXPECT_ZERO(strcmp("doop", fileInfo->mGroup));
hdfsFreeFileInfo(fileInfo, 1);
EXPECT_ZERO(hdfsChown(fs, tmp, "ha", "doop2"));
fileInfo = hdfsGetPathInfo(fs, tmp);
EXPECT_ZERO(hdfsChown(fs, paths->file2, "ha", "doop2"));
fileInfo = hdfsGetPathInfo(fs, paths->file2);
EXPECT_NONNULL(fileInfo);
EXPECT_ZERO(strcmp("ha", fileInfo->mOwner));
EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
hdfsFreeFileInfo(fileInfo, 1);
EXPECT_ZERO(hdfsChown(fs, tmp, "ha2", NULL));
fileInfo = hdfsGetPathInfo(fs, tmp);
EXPECT_ZERO(hdfsChown(fs, paths->file2, "ha2", NULL));
fileInfo = hdfsGetPathInfo(fs, paths->file2);
EXPECT_NONNULL(fileInfo);
EXPECT_ZERO(strcmp("ha2", fileInfo->mOwner));
EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup));
hdfsFreeFileInfo(fileInfo, 1);
EXPECT_ZERO(hdfsDelete(fs, prefix, 1));
snprintf(tmp, sizeof(tmp), "%s/nonexistent-file-name", paths->prefix);
EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, tmp, "ha3", NULL), ENOENT);
return 0;
}
static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
{
hdfsFS fs = NULL;
struct tlhPaths paths;
fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
ti->threadIdx);
EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
EXPECT_ZERO(setupPaths(ti, &paths));
// test some operations
EXPECT_ZERO(doTestHdfsOperations(ti, fs, &paths));
EXPECT_ZERO(hdfsDisconnect(fs));
// reconnect as user "foo" and verify that we get permission errors
EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, "foo"));
EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, paths.file1, "ha3", NULL), EACCES);
EXPECT_ZERO(hdfsDisconnect(fs));
// reconnect to do the final delete.
EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
EXPECT_ZERO(hdfsDelete(fs, paths.prefix, 1));
EXPECT_ZERO(hdfsDisconnect(fs));
return 0;
}
static void *testHdfsOperations(void *v)
{
struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
hdfsFS fs = NULL;
int ret;
fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
ti->threadIdx);
ret = hdfsSingleNameNodeConnect(tlhCluster, &fs);
if (ret) {
fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
"hdfsSingleNameNodeConnect failed with error %d.\n",
ti->threadIdx, ret);
ti->success = EIO;
return NULL;
}
ti->success = doTestHdfsOperations(ti, fs);
if (hdfsDisconnect(fs)) {
ret = errno;
fprintf(stderr, "hdfsDisconnect error %d\n", ret);
ti->success = ret;
}
int ret = testHdfsOperationsImpl(ti);
ti->success = ret;
return NULL;
}

View File

@ -226,6 +226,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5631. TestJobEndNotifier.testNotifyRetries fails with Should
have taken more than 5 seconds in jdk7 (Jonathan Eagles via jlowe)
MAPREDUCE-5645. TestFixedLengthInputFormat fails with native libs (Mit
Desai via jeagles)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -197,17 +197,17 @@ public void testPartialRecordUncompressedIn() throws IOException {
public void testGzipWithTwoInputs() throws IOException {
CompressionCodec gzip = new GzipCodec();
localFs.delete(workDir, true);
// Create files with fixed length records with 5 byte long records.
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
"one two threefour five six seveneightnine ten ");
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"ten nine eightsevensix five four threetwo one ");
FixedLengthInputFormat format = new FixedLengthInputFormat();
JobConf job = new JobConf(defaultConf);
format.setRecordLength(job, 5);
FileInputFormat.setInputPaths(job, workDir);
ReflectionUtils.setConf(gzip, job);
format.configure(job);
// Create files with fixed length records with 5 byte long records.
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
"one two threefour five six seveneightnine ten ");
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"ten nine eightsevensix five four threetwo one ");
InputSplit[] splits = format.getSplits(job, 100);
assertEquals("compressed splits == 2", 2, splits.length);
FileSplit tmp = (FileSplit) splits[0];
@ -283,12 +283,16 @@ private void runRandomTests(CompressionCodec codec) throws IOException {
int fileSize = (totalRecords * recordLength);
LOG.info("totalRecords=" + totalRecords + " recordLength="
+ recordLength);
// Create the job
JobConf job = new JobConf(defaultConf);
if (codec != null) {
ReflectionUtils.setConf(codec, job);
}
// Create the test file
ArrayList<String> recordList
= createFile(file, codec, recordLength, totalRecords);
assertTrue(localFs.exists(file));
// Create the job and set the fixed length record length config property
JobConf job = new JobConf(defaultConf);
//set the fixed length record length config property for the job
FixedLengthInputFormat.setRecordLength(job, recordLength);
int numSplits = 1;
@ -383,8 +387,6 @@ private void runPartialRecordTest(CompressionCodec codec) throws IOException {
if (codec != null) {
fileName.append(".gz");
}
writeFile(localFs, new Path(workDir, fileName.toString()), codec,
"one two threefour five six seveneightnine ten");
FixedLengthInputFormat format = new FixedLengthInputFormat();
JobConf job = new JobConf(defaultConf);
format.setRecordLength(job, 5);
@ -393,6 +395,8 @@ private void runPartialRecordTest(CompressionCodec codec) throws IOException {
ReflectionUtils.setConf(codec, job);
}
format.configure(job);
writeFile(localFs, new Path(workDir, fileName.toString()), codec,
"one two threefour five six seveneightnine ten");
InputSplit[] splits = format.getSplits(job, 100);
if (codec != null) {
assertEquals("compressed splits == 1", 1, splits.length);

View File

@ -225,16 +225,16 @@ public void testPartialRecordUncompressedIn() throws Exception {
public void testGzipWithTwoInputs() throws Exception {
CompressionCodec gzip = new GzipCodec();
localFs.delete(workDir, true);
// Create files with fixed length records with 5 byte long records.
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
"one two threefour five six seveneightnine ten ");
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"ten nine eightsevensix five four threetwo one ");
Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), 5);
ReflectionUtils.setConf(gzip, job.getConfiguration());
FileInputFormat.setInputPaths(job, workDir);
// Create files with fixed length records with 5 byte long records.
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
"one two threefour five six seveneightnine ten ");
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"ten nine eightsevensix five four threetwo one ");
List<InputSplit> splits = format.getSplits(job);
assertEquals("compressed splits == 2", 2, splits.size());
FileSplit tmp = (FileSplit) splits.get(0);
@ -310,12 +310,16 @@ private void runRandomTests(CompressionCodec codec) throws Exception {
int fileSize = (totalRecords * recordLength);
LOG.info("totalRecords=" + totalRecords + " recordLength="
+ recordLength);
// Create the job
Job job = Job.getInstance(defaultConf);
if (codec != null) {
ReflectionUtils.setConf(codec, job.getConfiguration());
}
// Create the test file
ArrayList<String> recordList =
createFile(file, codec, recordLength, totalRecords);
assertTrue(localFs.exists(file));
// Create the job and set the fixed length record length config property
Job job = Job.getInstance(defaultConf);
//set the fixed length record length config property for the job
FixedLengthInputFormat.setRecordLength(job.getConfiguration(),
recordLength);

View File

@ -132,6 +132,9 @@ Release 2.4.0 - UNRELEASED
YARN-1318. Promoted AdminService to an Always-On service and merged it into
RMHAProtocolService. (Karthik Kambatla via vinodkv)
YARN-1332. In TestAMRMClient, replace assertTrue with assertEquals where
possible (Sebastian Wong via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
@ -191,6 +194,12 @@ Release 2.4.0 - UNRELEASED
YARN-1416. Fixed a few invalid transitions in RMApp, RMAppAttempt and in some
tests. (Jian He via vinodkv)
YARN-895. Changed RM state-store to not crash immediately if RM restarts while
the state-store is down. (Jian He via vinodkv)
YARN-1454. Fixed test failure issue with TestRMRestart. (Karthik Kambatla
via vinodkv)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -305,4 +305,9 @@
<Bug pattern="NM_CLASS_NOT_EXCEPTION" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>

View File

@ -301,22 +301,30 @@ public class YarnConfiguration extends Configuration {
public static final String RM_STORE = RM_PREFIX + "store.class";
/** URI for FileSystemRMStateStore */
public static final String FS_RM_STATE_STORE_URI =
RM_PREFIX + "fs.state-store.uri";
public static final String FS_RM_STATE_STORE_URI = RM_PREFIX
+ "fs.state-store.uri";
public static final String FS_RM_STATE_STORE_RETRY_POLICY_SPEC = RM_PREFIX
+ "fs.state-store.retry-policy-spec";
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
"2000, 500";
/**
* Comma separated host:port pairs, each corresponding to a ZK server for
* ZKRMStateStore
*/
public static final String ZK_STATE_STORE_PREFIX =
RM_PREFIX + "zk.state-store.";
RM_PREFIX + "zk-state-store.";
public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
ZK_STATE_STORE_PREFIX + "num-retries";
public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3;
public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 500;
/** retry interval when connecting to zookeeper*/
public static final String ZK_RM_STATE_STORE_RETRY_INTERVAL_MS =
ZK_STATE_STORE_PREFIX + "retry-interval-ms";
public static final long DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = 2000;
public static final String ZK_RM_STATE_STORE_ADDRESS =
ZK_STATE_STORE_PREFIX + "address";
/** Timeout in millisec for ZK server connection for ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
ZK_STATE_STORE_PREFIX + "timeout.ms";
ZK_STATE_STORE_PREFIX + "timeout-ms";
public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
/** Parent znode path under which ZKRMStateStore will create znodes */
public static final String ZK_RM_STATE_STORE_PARENT_PATH =

View File

@ -248,7 +248,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
matches = amClient.getMatchingRequests(priority, node, testCapability1);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest);
assertEquals(storedContainer1, storedRequest);
amClient.removeContainerRequest(storedContainer1);
// exact matching with order maintained
@ -259,9 +259,9 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
int i = 0;
for(ContainerRequest storedRequest1 : matches.get(0)) {
if(i++ == 0) {
assertTrue(storedContainer4 == storedRequest1);
assertEquals(storedContainer4, storedRequest1);
} else {
assertTrue(storedContainer6 == storedRequest1);
assertEquals(storedContainer6, storedRequest1);
}
}
amClient.removeContainerRequest(storedContainer6);
@ -276,7 +276,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
assert(matches.size() == 2);
// verify non-fitting containers are not returned and fitting ones are
for(Collection<ContainerRequest> testSet : matches) {
assertTrue(testSet.size() == 1);
assertEquals(1, testSet.size());
ContainerRequest testRequest = testSet.iterator().next();
assertTrue(testRequest != storedContainer4);
assertTrue(testRequest != storedContainer5);
@ -310,8 +310,8 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
private void verifyMatches(
List<? extends Collection<ContainerRequest>> matches,
int matchSize) {
assertTrue(matches.size() == 1);
assertTrue(matches.get(0).size() == matchSize);
assertEquals(1, matches.size());
assertEquals(matches.get(0).size(), matchSize);
}
@Test (timeout=60000)
@ -337,12 +337,12 @@ public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOExce
matches = amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest);
assertEquals(storedContainer1, storedRequest);
// inferred match rack
matches = amClient.getMatchingRequests(priority, rack, capability);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest);
assertEquals(storedContainer1, storedRequest);
// inferred rack match no longer valid after request is removed
amClient.removeContainerRequest(storedContainer1);
@ -387,10 +387,10 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
// test addition and storage
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
assertTrue(containersRequestedAny == 2);
assertEquals(2, containersRequestedAny);
containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
assertTrue(containersRequestedAny == 1);
assertEquals(1, containersRequestedAny);
List<? extends Collection<ContainerRequest>> matches =
amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 2);
@ -417,7 +417,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
// test matching of containers
ContainerRequest storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest);
assertEquals(storedContainer1, storedRequest);
amClient.removeContainerRequest(storedContainer1);
matches =
amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
@ -438,10 +438,10 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
&& iterationsLeft-- > 0) {
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
assertTrue(nodeCount == amClient.getClusterNodeCount());
assertEquals(nodeCount, amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
ContainerRequest expectedRequest =
@ -453,7 +453,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
// test correct matched container is returned
verifyMatches(matches, 1);
ContainerRequest matchedRequest = matches.get(0).iterator().next();
assertTrue(matchedRequest == expectedRequest);
assertEquals(matchedRequest, expectedRequest);
amClient.removeContainerRequest(matchedRequest);
// assign this container, use it and release it
amClient.releaseAssignedContainer(container.getId());
@ -464,11 +464,11 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
}
}
assertTrue(allocatedContainerCount == 2);
assertEquals(2, allocatedContainerCount);
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.release.size() == 0);
assertTrue(amClient.ask.size() == 0);
assertTrue(allocResponse.getAllocatedContainers().size() == 0);
assertEquals(0, amClient.release.size());
assertEquals(0, amClient.ask.size());
assertEquals(0, allocResponse.getAllocatedContainers().size());
// 0 requests left. everything got cleaned up
assertTrue(amClient.remoteRequestsTable.isEmpty());
@ -494,14 +494,14 @@ public void testAllocationWithBlacklist() throws YarnException, IOException {
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
ContainerRequest storedContainer1 =
new ContainerRequest(capability, nodes, racks, priority);
amClient.addContainerRequest(storedContainer1);
assertTrue(amClient.ask.size() == 3);
assertTrue(amClient.release.size() == 0);
assertEquals(3, amClient.ask.size());
assertEquals(0, amClient.release.size());
List<String> localNodeBlacklist = new ArrayList<String>();
localNodeBlacklist.add(node);
@ -512,7 +512,7 @@ public void testAllocationWithBlacklist() throws YarnException, IOException {
int allocatedContainerCount = getAllocatedContainersNumber(amClient,
DEFAULT_ITERATION);
// the only node is in blacklist, so no allocation
assertTrue(allocatedContainerCount == 0);
assertEquals(0, allocatedContainerCount);
// Remove node from blacklist, so get assigned with 2
amClient.updateBlacklist(null, localNodeBlacklist);
@ -521,7 +521,7 @@ public void testAllocationWithBlacklist() throws YarnException, IOException {
amClient.addContainerRequest(storedContainer2);
allocatedContainerCount = getAllocatedContainersNumber(amClient,
DEFAULT_ITERATION);
assertEquals(allocatedContainerCount, 2);
assertEquals(2, allocatedContainerCount);
// Test in case exception in allocate(), blacklist is kept
assertTrue(amClient.blacklistAdditions.isEmpty());
@ -538,7 +538,7 @@ public void testAllocationWithBlacklist() throws YarnException, IOException {
amClient.allocate(0.1f);
fail("there should be an exception here.");
} catch (Exception e) {
assertEquals(amClient.blacklistAdditions.size(), 1);
assertEquals(1, amClient.blacklistAdditions.size());
}
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
@ -565,16 +565,16 @@ public void testAMRMClientWithBlacklist() throws YarnException, IOException {
nodeList01.add(nodes[0]);
nodeList01.add(nodes[1]);
amClient.updateBlacklist(nodeList01, null);
assertEquals(amClient.blacklistAdditions.size(),2);
assertEquals(amClient.blacklistRemovals.size(),0);
assertEquals(2, amClient.blacklistAdditions.size());
assertEquals(0, amClient.blacklistRemovals.size());
// Add nodes[0] again, verify it is not added duplicated.
List<String> nodeList02 = new ArrayList<String>();
nodeList02.add(nodes[0]);
nodeList02.add(nodes[2]);
amClient.updateBlacklist(nodeList02, null);
assertEquals(amClient.blacklistAdditions.size(),3);
assertEquals(amClient.blacklistRemovals.size(),0);
assertEquals(3, amClient.blacklistAdditions.size());
assertEquals(0, amClient.blacklistRemovals.size());
// Add nodes[1] and nodes[2] to removal list,
// Verify addition list remove these two nodes.
@ -582,16 +582,16 @@ public void testAMRMClientWithBlacklist() throws YarnException, IOException {
nodeList12.add(nodes[1]);
nodeList12.add(nodes[2]);
amClient.updateBlacklist(null, nodeList12);
assertEquals(amClient.blacklistAdditions.size(),1);
assertEquals(amClient.blacklistRemovals.size(),2);
assertEquals(1, amClient.blacklistAdditions.size());
assertEquals(2, amClient.blacklistRemovals.size());
// Add nodes[1] again to addition list,
// Verify removal list will remove this node.
List<String> nodeList1 = new ArrayList<String>();
nodeList1.add(nodes[1]);
amClient.updateBlacklist(nodeList1, null);
assertEquals(amClient.blacklistAdditions.size(),2);
assertEquals(amClient.blacklistRemovals.size(),1);
assertEquals(2, amClient.blacklistAdditions.size());
assertEquals(1, amClient.blacklistRemovals.size());
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
@ -606,10 +606,10 @@ private int getAllocatedContainersNumber(
while (iterationsLeft-- > 0) {
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
assertTrue(nodeCount == amClient.getClusterNodeCount());
assertEquals(nodeCount, amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
if(allocatedContainerCount == 0) {
@ -654,8 +654,8 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
throws YarnException, IOException {
// setup container request
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
@ -677,11 +677,11 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
assertTrue(containersRequestedNode == 2);
assertTrue(containersRequestedRack == 2);
assertTrue(containersRequestedAny == 2);
assertTrue(amClient.ask.size() == 3);
assertTrue(amClient.release.size() == 0);
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
assertEquals(2, containersRequestedAny);
assertEquals(3, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
@ -695,10 +695,10 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
while (allocatedContainerCount < containersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
assertTrue(nodeCount == amClient.getClusterNodeCount());
assertEquals(nodeCount, amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
@ -724,19 +724,19 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
Assert.assertTrue(receivedNMTokens.size() > 0
&& receivedNMTokens.size() <= nodeCount);
assertTrue(allocatedContainerCount == containersRequestedAny);
assertTrue(amClient.release.size() == 2);
assertTrue(amClient.ask.size() == 0);
assertEquals(allocatedContainerCount, containersRequestedAny);
assertEquals(2, amClient.release.size());
assertEquals(0, amClient.ask.size());
// need to tell the AMRMClient that we dont need these resources anymore
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
assertTrue(amClient.ask.size() == 3);
assertEquals(3, amClient.ask.size());
// send 0 container count request for resources that are no longer needed
ResourceRequest snoopRequest = amClient.ask.iterator().next();
assertTrue(snoopRequest.getNumContainers() == 0);
assertEquals(0, snoopRequest.getNumContainers());
// test RPC exception handling
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
@ -744,7 +744,7 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
racks, priority));
snoopRequest = amClient.ask.iterator().next();
assertTrue(snoopRequest.getNumContainers() == 2);
assertEquals(2, snoopRequest.getNumContainers());
ApplicationMasterProtocol realRM = amClient.rmClient;
try {
@ -768,12 +768,12 @@ public AllocateResponse answer(InvocationOnMock invocation)
amClient.rmClient = realRM;
}
assertTrue(amClient.release.size() == 2);
assertTrue(amClient.ask.size() == 3);
assertEquals(2, amClient.release.size());
assertEquals(3, amClient.ask.size());
snoopRequest = amClient.ask.iterator().next();
// verify that the remove request made in between makeRequest and allocate
// has not been lost
assertTrue(snoopRequest.getNumContainers() == 0);
assertEquals(0, snoopRequest.getNumContainers());
iterationsLeft = 3;
// do a few iterations to ensure RM is not going send new containers
@ -781,13 +781,13 @@ public AllocateResponse answer(InvocationOnMock invocation)
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
assertTrue(allocResponse.getAllocatedContainers().size() == 0);
assertEquals(0, allocResponse.getAllocatedContainers().size());
if(allocResponse.getCompletedContainersStatuses().size() > 0) {
for(ContainerStatus cStatus :allocResponse
.getCompletedContainersStatuses()) {
if(releases.contains(cStatus.getContainerId())) {
assertTrue(cStatus.getState() == ContainerState.COMPLETE);
assertTrue(cStatus.getExitStatus() == -100);
assertEquals(cStatus.getState(), ContainerState.COMPLETE);
assertEquals(-100, cStatus.getExitStatus());
releases.remove(cStatus.getContainerId());
}
}
@ -797,8 +797,8 @@ public AllocateResponse answer(InvocationOnMock invocation)
sleep(100);
}
}
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
}
private void sleep(int sleepTime) {

View File

@ -283,8 +283,8 @@
is implicitly fenced, meaning a single ResourceManager is
able to use the store at any point in time. More details on this, along
with setting up appropriate ACLs is discussed under the description for
yarn.resourcemanager.zk.state-store.root-node.acl.</description>
<name>yarn.resourcemanager.zk.state-store.address</name>
yarn.resourcemanager.zk-state-store.root-node.acl.</description>
<name>yarn.resourcemanager.zk-state-store.address</name>
<!--value>127.0.0.1:2181</value-->
</property>
@ -293,8 +293,15 @@
ZooKeeper. This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.num-retries</name>
<value>3</value>
<name>yarn.resourcemanager.zk-state-store.num-retries</name>
<value>500</value>
</property>
<property>
<description>Retry interval in milliseconds when ZKRMStateStore tries to
connect to ZooKeeper.</description>
<name>yarn.resourcemanager.zk-state-store.retry-interval-ms</name>
<value>2000</value>
</property>
<property>
@ -302,16 +309,20 @@
stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.parent-path</name>
<name>yarn.resourcemanager.zk-state-store.parent-path</name>
<value>/rmstore</value>
</property>
<property>
<description>Timeout when connecting to ZooKeeper.
<description>ZooKeeper session timeout in milliseconds. Session expiration
is managed by the ZooKeeper cluster itself, not by the client. This value is
used by the cluster to determine when the client's session expires.
Expirations happens when the cluster does not hear from the client within
the specified session timeout period (i.e. no heartbeat).
This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.timeout.ms</name>
<name>yarn.resourcemanager.zk-state-store.timeout-ms</name>
<value>60000</value>
</property>
@ -320,7 +331,7 @@
This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.acl</name>
<name>yarn.resourcemanager.zk-state-store.acl</name>
<value>world:anyone:rwcda</value>
</property>
@ -336,7 +347,7 @@
permissions.
By default, when this property is not set, we use the ACLs from
yarn.resourcemanager.zk.state-store.acl for shared admin access and
yarn.resourcemanager.zk-state-store.acl for shared admin access and
rm-address:cluster-timestamp for username-based exclusive create-delete
access.
@ -346,7 +357,7 @@
ResourceManagers have shared admin access and the Active ResourceManger
takes over (exclusively) the create-delete access.
</description>
<name>yarn.resourcemanager.zk.state-store.root-node.acl</name>
<name>yarn.resourcemanager.zk-state-store.root-node.acl</name>
</property>
<property>
@ -359,6 +370,16 @@
<!--value>hdfs://localhost:9000/rmstore</value-->
</property>
<property>
<description>hdfs client retry policy specification. hdfs client retry
is always enabled. Specified in pairs of sleep-time and number-of-retries
and (t0, n0), (t1, n1), ..., the first n0 retries sleep t0 milliseconds on
average, the following n1 retries sleep t1 milliseconds on average, and so on.
</description>
<name>yarn.resourcemanager.fs.state-store.retry-policy-spec</name>
<value>2000, 500</value>
</property>
<property>
<description>Enable RM high-availability. When enabled,
(1) The RM starts in the Standby mode by default, and transitions to

View File

@ -94,7 +94,14 @@ protected synchronized void startInternal() throws Exception {
// create filesystem only now, as part of service-start. By this time, RM is
// authenticated with kerberos so we are good to create a file-system
// handle.
fs = fsWorkingPath.getFileSystem(getConfig());
Configuration conf = new Configuration(getConfig());
conf.setBoolean("dfs.client.retry.policy.enabled", true);
String retryPolicy =
conf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC);
conf.set("dfs.client.retry.policy.spec", retryPolicy);
fs = fsWorkingPath.getFileSystem(conf);
fs.mkdirs(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot);
}

View File

@ -82,6 +82,7 @@ public class ZKRMStateStore extends RMStateStore {
private String zkHostPort = null;
private int zkSessionTimeout;
private long zkRetryInterval;
private List<ACL> zkAcl;
private String zkRootNodePath;
private String rmDTSecretManagerRoot;
@ -161,6 +162,9 @@ public synchronized void initInternal(Configuration conf) throws Exception {
zkSessionTimeout =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS);
zkRetryInterval =
conf.getLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS);
// Parse authentication from configuration.
String zkAclConf =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL,
@ -810,6 +814,9 @@ T runWithRetries() throws Exception {
}
} catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < numRetries) {
LOG.info("Waiting for zookeeper to be connected, retry no. + "
+ retry);
Thread.sleep(zkRetryInterval);
continue;
}
throw ke;

View File

@ -1247,6 +1247,8 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// renewDate before renewing
Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
try{
// Sleep for one millisecond to make sure renewDataAfterRenew is greater
Thread.sleep(1);
// renew recovered token
rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
} catch(Exception e) {

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@ -33,7 +36,9 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@ -81,6 +86,8 @@ public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString());
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
"100,6000");
this.store = new TestFileSystemRMStore(conf);
return store;
}
@ -139,4 +146,46 @@ public void testFSRMStateStore() throws Exception {
cluster.shutdown();
}
}
@Test (timeout = 30000)
public void testFSRMStateStoreClientRetry() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
final RMStateStore store = fsTester.getRMStateStore();
store.setRMDispatcher(new TestDispatcher());
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
cluster.shutdownNameNodes();
Thread clientThread = new Thread() {
@Override
public void run() {
try {
store.storeApplicationStateInternal("application1",
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
.newApplicationStateData(111, 111, "user", null,
RMAppState.ACCEPTED, "diagnostics", 333));
} catch (Exception e) {
// TODO 0 datanode exception will not be retried by dfs client, fix
// that separately.
if (!e.getMessage().contains("could only be replicated" +
" to 0 nodes instead of minReplication (=1)")) {
assertionFailedInThread.set(true);
}
e.printStackTrace();
}
}
};
Thread.sleep(2000);
clientThread.start();
cluster.restartNameNode();
clientThread.join();
Assert.assertFalse(assertionFailedInThread.get());
} finally {
cluster.shutdown();
}
}
}

View File

@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -114,6 +115,37 @@ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
}
}
@Test (timeout = 20000)
public void testZKClientRetry() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
final String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
final ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
stopServer();
Thread clientThread = new Thread() {
@Override
public void run() {
try {
store.getDataWithRetries(path, true);
} catch (Exception e) {
e.printStackTrace();
assertionFailedInThread.set(true);
}
}
};
Thread.sleep(2000);
startServer();
clientThread.join();
Assert.assertFalse(assertionFailedInThread.get());
}
@Test(timeout = 20000)
public void testZKClientDisconnectAndReconnect()
throws Exception {