Merging r1549949 through r1550312 from trunk to branch HDFS-2832
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1550313 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
cd083aa807
@ -538,6 +538,9 @@ Release 2.3.0 - UNRELEASED
|
||||
HADOOP-10081. Client.setupIOStreams can leak socket resources on exception
|
||||
or error (Tsuyoshi OZAWA via jlowe)
|
||||
|
||||
HADOOP-10087. UserGroupInformation.getGroupNames() fails to return primary
|
||||
group first when JniBasedUnixGroupsMappingWithFallback is used (cmccabe)
|
||||
|
||||
Release 2.2.0 - 2013-10-13
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -122,13 +122,43 @@ int hadoop_user_info_fetch(struct hadoop_user_info *uinfo,
|
||||
}
|
||||
}
|
||||
|
||||
static int put_primary_gid_first(struct hadoop_user_info *uinfo)
|
||||
{
|
||||
int i, num_gids = uinfo->num_gids;
|
||||
gid_t first_gid;
|
||||
gid_t gid;
|
||||
gid_t primary = uinfo->pwd.pw_gid;
|
||||
|
||||
if (num_gids < 1) {
|
||||
// There are no gids, but we expected at least one.
|
||||
return EINVAL;
|
||||
}
|
||||
first_gid = uinfo->gids[0];
|
||||
if (first_gid == primary) {
|
||||
// First gid is already the primary.
|
||||
return 0;
|
||||
}
|
||||
for (i = 1; i < num_gids; i++) {
|
||||
gid = uinfo->gids[i];
|
||||
if (gid == primary) {
|
||||
// swap first gid and this gid.
|
||||
uinfo->gids[0] = gid;
|
||||
uinfo->gids[i] = first_gid;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
// Did not find the primary gid in the list.
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
int hadoop_user_info_getgroups(struct hadoop_user_info *uinfo)
|
||||
{
|
||||
int ret, ngroups;
|
||||
gid_t *ngids;
|
||||
|
||||
if (!uinfo->pwd.pw_name) {
|
||||
return EINVAL; // invalid user info
|
||||
// invalid user info
|
||||
return EINVAL;
|
||||
}
|
||||
uinfo->num_gids = 0;
|
||||
if (!uinfo->gids) {
|
||||
@ -141,8 +171,12 @@ int hadoop_user_info_getgroups(struct hadoop_user_info *uinfo)
|
||||
ngroups = uinfo->gids_size;
|
||||
ret = getgrouplist(uinfo->pwd.pw_name, uinfo->pwd.pw_gid,
|
||||
uinfo->gids, &ngroups);
|
||||
if (ret != -1) {
|
||||
if (ret > 0) {
|
||||
uinfo->num_gids = ngroups;
|
||||
ret = put_primary_gid_first(uinfo);
|
||||
if (ret) {
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
ngids = realloc(uinfo->gids, sizeof(uinfo->gids[0]) * ngroups);
|
||||
@ -153,11 +187,12 @@ int hadoop_user_info_getgroups(struct hadoop_user_info *uinfo)
|
||||
uinfo->gids_size = ngroups;
|
||||
ret = getgrouplist(uinfo->pwd.pw_name, uinfo->pwd.pw_gid,
|
||||
uinfo->gids, &ngroups);
|
||||
if (ret != -1) {
|
||||
uinfo->num_gids = ngroups;
|
||||
return 0;
|
||||
if (ret < 0) {
|
||||
return EIO;
|
||||
}
|
||||
return EIO;
|
||||
uinfo->num_gids = ngroups;
|
||||
ret = put_primary_gid_first(uinfo);
|
||||
return ret;
|
||||
}
|
||||
|
||||
#ifdef USER_TESTING
|
||||
|
@ -587,6 +587,9 @@ Release 2.4.0 - UNRELEASED
|
||||
|
||||
HDFS-5633. Improve OfflineImageViewer to use less memory. (jing9)
|
||||
|
||||
HDFS-5023. TestSnapshotPathINodes.testAllowSnapshot is failing with jdk7
|
||||
(Mit Desai via jeagles)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
||||
@ -616,9 +619,6 @@ Release 2.4.0 - UNRELEASED
|
||||
HDFS-5352. Server#initLog() doesn't close InputStream in httpfs. (Ted Yu via
|
||||
jing9)
|
||||
|
||||
HDFS-5283. Under construction blocks only inside snapshots should not be
|
||||
counted in safemode threshhold. (Vinay via szetszwo)
|
||||
|
||||
HDFS-4376. Fix race conditions in Balancer. (Junping Du via szetszwo)
|
||||
|
||||
HDFS-5375. hdfs.cmd does not expose several snapshot commands. (cnauroth)
|
||||
@ -629,43 +629,22 @@ Release 2.4.0 - UNRELEASED
|
||||
HDFS-5400. DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT constant is set
|
||||
to the wrong value. (Colin Patrick McCabe)
|
||||
|
||||
HDFS-5257. addBlock() retry should return LocatedBlock with locations else client
|
||||
will get AIOBE. (Vinay via jing9)
|
||||
|
||||
HDFS-5427. Not able to read deleted files from snapshot directly under
|
||||
snapshottable dir after checkpoint and NN restart. (Vinay via jing9)
|
||||
|
||||
HDFS-5443. Delete 0-sized block when deleting an under-construction file that
|
||||
is included in snapshot. (jing9)
|
||||
|
||||
HDFS-5476. Snapshot: clean the blocks/files/directories under a renamed
|
||||
file/directory while deletion. (jing9)
|
||||
|
||||
HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on
|
||||
restart. (jing9 and Vinay)
|
||||
|
||||
HDFS-5474. Deletesnapshot can make Namenode in safemode on NN restarts.
|
||||
(Sathish via jing9)
|
||||
|
||||
HDFS-5075. httpfs-config.sh calls out incorrect env script name
|
||||
(Timothy St. Clair via stevel)
|
||||
|
||||
HDFS-5504. In HA mode, OP_DELETE_SNAPSHOT is not decrementing the safemode threshold,
|
||||
leads to NN safemode. (Vinay via jing9)
|
||||
|
||||
HDFS-5438. Flaws in block report processing can cause data loss. (kihwal)
|
||||
|
||||
HDFS-5502. Fix HTTPS support in HsftpFileSystem. (Haohui Mai via jing9)
|
||||
|
||||
HDFS-5428. Under construction files deletion after snapshot+checkpoint+nn restart
|
||||
leads nn safemode. (jing9)
|
||||
|
||||
HDFS-5552. Fix wrong information of "Cluster summay" in dfshealth.html.
|
||||
(Haohui Mai via jing9)
|
||||
|
||||
HDFS-5533. Symlink delete/create should be treated as DELETE/CREATE in snapshot diff
|
||||
report. (Binglin Chang via jing9)
|
||||
|
||||
HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion.
|
||||
(Binglin Chang via junping_du)
|
||||
|
||||
Release 2.3.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
@ -804,6 +783,38 @@ Release 2.3.0 - UNRELEASED
|
||||
HDFS-5353. Short circuit reads fail when dfs.encrypt.data.transfer is
|
||||
enabled. (Colin Patrick McCabe via jing9)
|
||||
|
||||
HDFS-5283. Under construction blocks only inside snapshots should not be
|
||||
counted in safemode threshhold. (Vinay via szetszwo)
|
||||
|
||||
HDFS-5257. addBlock() retry should return LocatedBlock with locations else client
|
||||
will get AIOBE. (Vinay via jing9)
|
||||
|
||||
HDFS-5427. Not able to read deleted files from snapshot directly under
|
||||
snapshottable dir after checkpoint and NN restart. (Vinay via jing9)
|
||||
|
||||
HDFS-5443. Delete 0-sized block when deleting an under-construction file that
|
||||
is included in snapshot. (jing9)
|
||||
|
||||
HDFS-5476. Snapshot: clean the blocks/files/directories under a renamed
|
||||
file/directory while deletion. (jing9)
|
||||
|
||||
HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on
|
||||
restart. (jing9 and Vinay)
|
||||
|
||||
HDFS-5474. Deletesnapshot can make Namenode in safemode on NN restarts.
|
||||
(Sathish via jing9)
|
||||
|
||||
HDFS-5504. In HA mode, OP_DELETE_SNAPSHOT is not decrementing the safemode threshold,
|
||||
leads to NN safemode. (Vinay via jing9)
|
||||
|
||||
HDFS-5428. Under construction files deletion after snapshot+checkpoint+nn restart
|
||||
leads nn safemode. (jing9)
|
||||
|
||||
HDFS-5074. Allow starting up from an fsimage checkpoint in the middle of a
|
||||
segment. (Todd Lipcon via atm)
|
||||
|
||||
HDFS-4201. NPE in BPServiceActor#sendHeartBeat. (jxiang via cmccabe)
|
||||
|
||||
Release 2.2.0 - 2013-10-13
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -500,14 +500,9 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||
}
|
||||
}
|
||||
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk) throws IOException {
|
||||
selectInputStreams(streams, fromTxId, inProgressOk, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk, boolean forReading)
|
||||
long fromTxId, boolean inProgressOk)
|
||||
throws IOException {
|
||||
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
|
||||
inProgressOk);
|
||||
|
@ -109,7 +109,7 @@ interface AsyncLogger {
|
||||
* Fetch the list of edit logs available on the remote node.
|
||||
*/
|
||||
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
||||
long fromTxnId, boolean forReading, boolean inProgressOk);
|
||||
long fromTxnId, boolean inProgressOk);
|
||||
|
||||
/**
|
||||
* Prepare recovery. See the HDFS-3077 design document for details.
|
||||
|
@ -261,13 +261,13 @@ class AsyncLoggerSet {
|
||||
}
|
||||
|
||||
public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
|
||||
long fromTxnId, boolean forReading, boolean inProgressOk) {
|
||||
long fromTxnId, boolean inProgressOk) {
|
||||
Map<AsyncLogger,
|
||||
ListenableFuture<RemoteEditLogManifest>> calls
|
||||
= Maps.newHashMap();
|
||||
for (AsyncLogger logger : loggers) {
|
||||
ListenableFuture<RemoteEditLogManifest> future =
|
||||
logger.getEditLogManifest(fromTxnId, forReading, inProgressOk);
|
||||
logger.getEditLogManifest(fromTxnId, inProgressOk);
|
||||
calls.put(logger, future);
|
||||
}
|
||||
return QuorumCall.create(calls);
|
||||
|
@ -181,6 +181,7 @@ public class IPCLoggerChannel implements AsyncLogger {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
QuorumJournalManager.LOG.info("Closing", new Exception());
|
||||
// No more tasks may be submitted after this point.
|
||||
executor.shutdown();
|
||||
if (proxy != null) {
|
||||
@ -520,13 +521,12 @@ public class IPCLoggerChannel implements AsyncLogger {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
||||
final long fromTxnId, final boolean forReading,
|
||||
final boolean inProgressOk) {
|
||||
final long fromTxnId, final boolean inProgressOk) {
|
||||
return executor.submit(new Callable<RemoteEditLogManifest>() {
|
||||
@Override
|
||||
public RemoteEditLogManifest call() throws IOException {
|
||||
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
|
||||
journalId, fromTxnId, forReading, inProgressOk);
|
||||
journalId, fromTxnId, inProgressOk);
|
||||
// Update the http port, since we need this to build URLs to any of the
|
||||
// returned logs.
|
||||
constructHttpServerURI(ret);
|
||||
|
@ -449,18 +449,13 @@ public class QuorumJournalManager implements JournalManager {
|
||||
public void close() throws IOException {
|
||||
loggers.close();
|
||||
}
|
||||
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxnId, boolean inProgressOk) throws IOException {
|
||||
selectInputStreams(streams, fromTxnId, inProgressOk, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException {
|
||||
long fromTxnId, boolean inProgressOk) throws IOException {
|
||||
|
||||
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
|
||||
loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk);
|
||||
loggers.getEditLogManifest(fromTxnId, inProgressOk);
|
||||
Map<AsyncLogger, RemoteEditLogManifest> resps =
|
||||
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
|
||||
"selectInputStreams");
|
||||
|
@ -123,14 +123,12 @@ public interface QJournalProtocol {
|
||||
/**
|
||||
* @param jid the journal from which to enumerate edits
|
||||
* @param sinceTxId the first transaction which the client cares about
|
||||
* @param forReading whether or not the caller intends to read from the edit
|
||||
* logs
|
||||
* @param inProgressOk whether or not to check the in-progress edit log
|
||||
* segment
|
||||
* @return a list of edit log segments since the given transaction ID.
|
||||
*/
|
||||
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||
long sinceTxId, boolean forReading, boolean inProgressOk)
|
||||
long sinceTxId, boolean inProgressOk)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -203,7 +203,6 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
|
||||
return impl.getEditLogManifest(
|
||||
request.getJid().getIdentifier(),
|
||||
request.getSinceTxId(),
|
||||
request.getForReading(),
|
||||
request.getInProgressOk());
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
|
@ -228,14 +228,13 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
|
||||
|
||||
@Override
|
||||
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||
long sinceTxId, boolean forReading, boolean inProgressOk)
|
||||
long sinceTxId, boolean inProgressOk)
|
||||
throws IOException {
|
||||
try {
|
||||
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
|
||||
GetEditLogManifestRequestProto.newBuilder()
|
||||
.setJid(convertJournalId(jid))
|
||||
.setSinceTxId(sinceTxId)
|
||||
.setForReading(forReading)
|
||||
.setInProgressOk(inProgressOk)
|
||||
.build());
|
||||
} catch (ServiceException e) {
|
||||
|
@ -630,15 +630,12 @@ class Journal implements Closeable {
|
||||
* @see QJournalProtocol#getEditLogManifest(String, long)
|
||||
*/
|
||||
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
|
||||
boolean forReading, boolean inProgressOk) throws IOException {
|
||||
boolean inProgressOk) throws IOException {
|
||||
// No need to checkRequest() here - anyone may ask for the list
|
||||
// of segments.
|
||||
checkFormatted();
|
||||
|
||||
// if this is for reading, ignore the in-progress editlog segment
|
||||
inProgressOk = forReading ? false : inProgressOk;
|
||||
List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, forReading,
|
||||
inProgressOk);
|
||||
List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, inProgressOk);
|
||||
|
||||
if (inProgressOk) {
|
||||
RemoteEditLog log = null;
|
||||
|
@ -178,11 +178,11 @@ class JournalNodeRpcServer implements QJournalProtocol {
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||
long sinceTxId, boolean forReading, boolean inProgressOk)
|
||||
long sinceTxId, boolean inProgressOk)
|
||||
throws IOException {
|
||||
|
||||
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
|
||||
.getEditLogManifest(sinceTxId, forReading, inProgressOk);
|
||||
.getEditLogManifest(sinceTxId, inProgressOk);
|
||||
|
||||
return GetEditLogManifestResponseProto.newBuilder()
|
||||
.setManifest(PBHelper.convert(manifest))
|
||||
|
@ -295,26 +295,27 @@ public class Balancer {
|
||||
*/
|
||||
private boolean chooseProxySource() {
|
||||
final DatanodeInfo targetDN = target.getDatanode();
|
||||
boolean find = false;
|
||||
for (BalancerDatanode loc : block.getLocations()) {
|
||||
// check if there is replica which is on the same rack with the target
|
||||
if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
|
||||
find = true;
|
||||
// if cluster is not nodegroup aware or the proxy is on the same
|
||||
// nodegroup with target, then we already find the nearest proxy
|
||||
if (!cluster.isNodeGroupAware()
|
||||
|| cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
|
||||
// if node group is supported, first try add nodes in the same node group
|
||||
if (cluster.isNodeGroupAware()) {
|
||||
for (BalancerDatanode loc : block.getLocations()) {
|
||||
if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!find) {
|
||||
// find out a non-busy replica out of rack of target
|
||||
find = addTo(loc);
|
||||
}
|
||||
// check if there is replica which is on the same rack with the target
|
||||
for (BalancerDatanode loc : block.getLocations()) {
|
||||
if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return find;
|
||||
// find out a non-busy replica
|
||||
for (BalancerDatanode loc : block.getLocations()) {
|
||||
if (addTo(loc)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// add a BalancerDatanode as proxy source for specific block movement
|
||||
|
@ -277,12 +277,22 @@ class BPOfferService {
|
||||
synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
|
||||
if (this.bpNSInfo == null) {
|
||||
this.bpNSInfo = nsInfo;
|
||||
|
||||
boolean success = false;
|
||||
|
||||
// Now that we know the namespace ID, etc, we can pass this to the DN.
|
||||
// The DN can now initialize its local storage if we are the
|
||||
// first BP to handshake, etc.
|
||||
dn.initBlockPool(this);
|
||||
return;
|
||||
try {
|
||||
dn.initBlockPool(this);
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
// The datanode failed to initialize the BP. We need to reset
|
||||
// the namespace info so that other BPService actors still have
|
||||
// a chance to set it, and re-initialize the datanode.
|
||||
this.bpNSInfo = null;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
|
||||
"Blockpool ID");
|
||||
|
@ -77,7 +77,7 @@ class BackupJournalManager implements JournalManager {
|
||||
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxnId, boolean inProgressOk, boolean forReading) {
|
||||
long fromTxnId, boolean inProgressOk) {
|
||||
// This JournalManager is never used for input. Therefore it cannot
|
||||
// return any transactions
|
||||
}
|
||||
|
@ -286,7 +286,7 @@ public class FSEditLog implements LogsPurgeable {
|
||||
// Safety check: we should never start a segment if there are
|
||||
// newer txids readable.
|
||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||
journalSet.selectInputStreams(streams, segmentTxId, true, true);
|
||||
journalSet.selectInputStreams(streams, segmentTxId, true);
|
||||
if (!streams.isEmpty()) {
|
||||
String error = String.format("Cannot start writing at txid %s " +
|
||||
"when there is a stream available for read: %s",
|
||||
@ -1037,7 +1037,7 @@ public class FSEditLog implements LogsPurgeable {
|
||||
*/
|
||||
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
|
||||
throws IOException {
|
||||
return journalSet.getEditLogManifest(fromTxId, true);
|
||||
return journalSet.getEditLogManifest(fromTxId);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1332,22 +1332,14 @@ public class FSEditLog implements LogsPurgeable {
|
||||
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk, boolean forReading) {
|
||||
journalSet.selectInputStreams(streams, fromTxId, inProgressOk, forReading);
|
||||
long fromTxId, boolean inProgressOk) throws IOException {
|
||||
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
|
||||
}
|
||||
|
||||
public Collection<EditLogInputStream> selectInputStreams(
|
||||
long fromTxId, long toAtLeastTxId) throws IOException {
|
||||
return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
|
||||
}
|
||||
|
||||
/** Select a list of input streams to load */
|
||||
public Collection<EditLogInputStream> selectInputStreams(
|
||||
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
||||
boolean inProgressOk) throws IOException {
|
||||
return selectInputStreams(fromTxId, toAtLeastTxId, recovery, inProgressOk,
|
||||
true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Select a list of input streams.
|
||||
@ -1355,13 +1347,12 @@ public class FSEditLog implements LogsPurgeable {
|
||||
* @param fromTxId first transaction in the selected streams
|
||||
* @param toAtLeast the selected streams must contain this transaction
|
||||
* @param inProgessOk set to true if in-progress streams are OK
|
||||
* @param forReading whether or not to use the streams to load the edit log
|
||||
*/
|
||||
public synchronized Collection<EditLogInputStream> selectInputStreams(
|
||||
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
||||
boolean inProgressOk, boolean forReading) throws IOException {
|
||||
boolean inProgressOk) throws IOException {
|
||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||
selectInputStreams(streams, fromTxId, inProgressOk, forReading);
|
||||
selectInputStreams(streams, fromTxId, inProgressOk);
|
||||
|
||||
try {
|
||||
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
|
||||
|
@ -167,19 +167,13 @@ public class FileJournalManager implements JournalManager {
|
||||
/**
|
||||
* Find all editlog segments starting at or above the given txid.
|
||||
* @param fromTxId the txnid which to start looking
|
||||
* @param forReading whether or not the caller intends to read from the edit
|
||||
* logs
|
||||
* @param inProgressOk whether or not to include the in-progress edit log
|
||||
* segment
|
||||
* @return a list of remote edit logs
|
||||
* @throws IOException if edit logs cannot be listed.
|
||||
*/
|
||||
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
|
||||
boolean forReading, boolean inProgressOk) throws IOException {
|
||||
// make sure not reading in-progress edit log, i.e., if forReading is true,
|
||||
// we should ignore the in-progress edit log.
|
||||
Preconditions.checkArgument(!(forReading && inProgressOk));
|
||||
|
||||
boolean inProgressOk) throws IOException {
|
||||
File currentDir = sd.getCurrentDir();
|
||||
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
|
||||
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
|
||||
@ -192,14 +186,9 @@ public class FileJournalManager implements JournalManager {
|
||||
if (elf.getFirstTxId() >= firstTxId) {
|
||||
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
||||
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
|
||||
// If the firstTxId is in the middle of an edit log segment
|
||||
if (forReading) {
|
||||
// Note that this behavior is different from getLogFiles below.
|
||||
throw new IllegalStateException("Asked for firstTxId " + firstTxId
|
||||
+ " which is in the middle of file " + elf.file);
|
||||
} else {
|
||||
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
||||
}
|
||||
// If the firstTxId is in the middle of an edit log segment. Return this
|
||||
// anyway and let the caller figure out whether it wants to use it.
|
||||
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
||||
}
|
||||
}
|
||||
|
||||
@ -260,7 +249,7 @@ public class FileJournalManager implements JournalManager {
|
||||
@Override
|
||||
synchronized public void selectInputStreams(
|
||||
Collection<EditLogInputStream> streams, long fromTxId,
|
||||
boolean inProgressOk, boolean forReading) throws IOException {
|
||||
boolean inProgressOk) throws IOException {
|
||||
List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
|
||||
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
|
||||
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
|
||||
|
@ -233,12 +233,10 @@ public class JournalSet implements JournalManager {
|
||||
* may not be sorted-- this is up to the caller.
|
||||
* @param fromTxId The transaction ID to start looking for streams at
|
||||
* @param inProgressOk Should we consider unfinalized streams?
|
||||
* @param forReading Whether or not the caller intends to read from
|
||||
* the returned streams.
|
||||
*/
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk, boolean forReading) {
|
||||
long fromTxId, boolean inProgressOk) {
|
||||
final PriorityQueue<EditLogInputStream> allStreams =
|
||||
new PriorityQueue<EditLogInputStream>(64,
|
||||
EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||
@ -248,8 +246,7 @@ public class JournalSet implements JournalManager {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk,
|
||||
forReading);
|
||||
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Unable to determine input streams from " + jas.getManager() +
|
||||
". Skipping.", ioe);
|
||||
@ -582,20 +579,20 @@ public class JournalSet implements JournalManager {
|
||||
|
||||
/**
|
||||
* Return a manifest of what finalized edit logs are available. All available
|
||||
* edit logs are returned starting from the transaction id passed.
|
||||
* edit logs are returned starting from the transaction id passed. If
|
||||
* 'fromTxId' falls in the middle of a log, that log is returned as well.
|
||||
*
|
||||
* @param fromTxId Starting transaction id to read the logs.
|
||||
* @return RemoteEditLogManifest object.
|
||||
*/
|
||||
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId,
|
||||
boolean forReading) {
|
||||
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
|
||||
// Collect RemoteEditLogs available from each FileJournalManager
|
||||
List<RemoteEditLog> allLogs = Lists.newArrayList();
|
||||
for (JournalAndStream j : journals) {
|
||||
if (j.getManager() instanceof FileJournalManager) {
|
||||
FileJournalManager fjm = (FileJournalManager)j.getManager();
|
||||
try {
|
||||
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, forReading, false));
|
||||
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, false));
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Cannot list edit logs in " + fjm, t);
|
||||
}
|
||||
|
@ -42,13 +42,11 @@ interface LogsPurgeable {
|
||||
*
|
||||
* @param fromTxId the first transaction id we want to read
|
||||
* @param inProgressOk whether or not in-progress streams should be returned
|
||||
* @param forReading whether or not the caller intends to read from the edit logs
|
||||
*
|
||||
* @return a list of streams
|
||||
* @throws IOException if the underlying storage has an error or is otherwise
|
||||
* inaccessible
|
||||
*/
|
||||
void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk, boolean forReading) throws IOException;
|
||||
long fromTxId, boolean inProgressOk) throws IOException;
|
||||
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ public class NNStorageRetentionManager {
|
||||
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
|
||||
|
||||
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
|
||||
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
|
||||
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
|
||||
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
|
||||
@Override
|
||||
public int compare(EditLogInputStream a, EditLogInputStream b) {
|
||||
|
@ -827,7 +827,7 @@ public class SecondaryNameNode implements Runnable {
|
||||
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk, boolean forReading) {
|
||||
long fromTxId, boolean inProgressOk) {
|
||||
Iterator<StorageDirectory> iter = storage.dirIterator();
|
||||
while (iter.hasNext()) {
|
||||
StorageDirectory dir = iter.next();
|
||||
|
@ -228,7 +228,7 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||
try {
|
||||
Collection<EditLogInputStream> streams =
|
||||
image.getEditLog().selectInputStreams(
|
||||
firstTxIdInLogs, curTxIdOnOtherNode, null, true, false);
|
||||
firstTxIdInLogs, curTxIdOnOtherNode, null, true);
|
||||
for (EditLogInputStream stream : streams) {
|
||||
IOUtils.closeStream(stream);
|
||||
}
|
||||
|
@ -165,7 +165,7 @@ public class EditLogTailer {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setEditLog(FSEditLog editLog) {
|
||||
public void setEditLog(FSEditLog editLog) {
|
||||
this.editLog = editLog;
|
||||
}
|
||||
|
||||
|
@ -178,7 +178,7 @@ message GetEditLogManifestRequestProto {
|
||||
required JournalIdProto jid = 1;
|
||||
required uint64 sinceTxId = 2; // Transaction ID
|
||||
// Whether or not the client will be reading from the returned streams.
|
||||
optional bool forReading = 3 [default = true];
|
||||
// optional bool forReading = 3 [default = true]; <obsolete, do not reuse>
|
||||
optional bool inProgressOk = 4 [default = false];
|
||||
}
|
||||
|
||||
|
@ -1494,8 +1494,9 @@ public class MiniDFSCluster {
|
||||
*/
|
||||
public synchronized void restartNameNodes() throws IOException {
|
||||
for (int i = 0; i < nameNodes.length; i++) {
|
||||
restartNameNode(i);
|
||||
restartNameNode(i, false);
|
||||
}
|
||||
waitActive();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,134 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.qjournal;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
|
||||
|
||||
public class MiniQJMHACluster {
|
||||
private MiniDFSCluster cluster;
|
||||
private MiniJournalCluster journalCluster;
|
||||
private final Configuration conf;
|
||||
|
||||
private static String NAMESERVICE = "ns1";
|
||||
private static final String NN1 = "nn1";
|
||||
private static final String NN2 = "nn2";
|
||||
private static final int NN1_IPC_PORT = 10000;
|
||||
private static final int NN1_INFO_PORT = 10001;
|
||||
private static final int NN2_IPC_PORT = 10002;
|
||||
private static final int NN2_INFO_PORT = 10003;
|
||||
|
||||
public static class Builder {
|
||||
private final Configuration conf;
|
||||
private final MiniDFSCluster.Builder dfsBuilder;
|
||||
|
||||
public Builder(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.dfsBuilder = new MiniDFSCluster.Builder(conf);
|
||||
}
|
||||
|
||||
public MiniDFSCluster.Builder getDfsBuilder() {
|
||||
return dfsBuilder;
|
||||
}
|
||||
|
||||
public MiniQJMHACluster build() throws IOException {
|
||||
return new MiniQJMHACluster(this);
|
||||
}
|
||||
}
|
||||
|
||||
public static MiniDFSNNTopology createDefaultTopology() {
|
||||
return new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
|
||||
new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
|
||||
.setHttpPort(NN1_INFO_PORT)).addNN(
|
||||
new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
|
||||
.setHttpPort(NN2_INFO_PORT)));
|
||||
}
|
||||
|
||||
private MiniQJMHACluster(Builder builder) throws IOException {
|
||||
this.conf = builder.conf;
|
||||
// start 3 journal nodes
|
||||
journalCluster = new MiniJournalCluster.Builder(conf).format(true)
|
||||
.build();
|
||||
URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
|
||||
|
||||
// start cluster with 2 NameNodes
|
||||
MiniDFSNNTopology topology = createDefaultTopology();
|
||||
|
||||
initHAConf(journalURI, builder.conf);
|
||||
|
||||
// First start up the NNs just to format the namespace. The MinIDFSCluster
|
||||
// has no way to just format the NameNodes without also starting them.
|
||||
cluster = builder.dfsBuilder.nnTopology(topology)
|
||||
.manageNameDfsSharedDirs(false).build();
|
||||
cluster.waitActive();
|
||||
cluster.shutdown();
|
||||
|
||||
// initialize the journal nodes
|
||||
Configuration confNN0 = cluster.getConfiguration(0);
|
||||
NameNode.initializeSharedEdits(confNN0, true);
|
||||
|
||||
// restart the cluster
|
||||
cluster.restartNameNodes();
|
||||
}
|
||||
|
||||
private Configuration initHAConf(URI journalURI, Configuration conf) {
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
journalURI.toString());
|
||||
|
||||
String address1 = "127.0.0.1:" + NN1_IPC_PORT;
|
||||
String address2 = "127.0.0.1:" + NN2_IPC_PORT;
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
NAMESERVICE, NN1), address1);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
NAMESERVICE, NN2), address2);
|
||||
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
|
||||
NN1 + "," + NN2);
|
||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
|
||||
ConfiguredFailoverProxyProvider.class.getName());
|
||||
conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
public MiniDFSCluster getDfsCluster() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
public MiniJournalCluster getJournalCluster() {
|
||||
return journalCluster;
|
||||
}
|
||||
|
||||
public void shutdown() throws IOException {
|
||||
cluster.shutdown();
|
||||
journalCluster.shutdown();
|
||||
}
|
||||
}
|
@ -18,7 +18,6 @@
|
||||
package org.apache.hadoop.hdfs.qjournal;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assume.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -916,7 +916,7 @@ public class TestQuorumJournalManager {
|
||||
NNStorage.getFinalizedEditsFileName(41, 50));
|
||||
|
||||
ArrayList<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||
qjm.selectInputStreams(streams, 25, false, false);
|
||||
qjm.selectInputStreams(streams, 25, false);
|
||||
|
||||
verifyEdits(streams, 25, 50);
|
||||
}
|
||||
|
@ -25,7 +25,9 @@ import static org.junit.Assert.assertSame;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -294,6 +296,47 @@ public class TestBPOfferService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test datanode block pool initialization error handling.
|
||||
* Failure in initializing a block pool should not cause NPE.
|
||||
*/
|
||||
@Test
|
||||
public void testBPInitErrorHandling() throws Exception {
|
||||
final DataNode mockDn = Mockito.mock(DataNode.class);
|
||||
Mockito.doReturn(true).when(mockDn).shouldRun();
|
||||
Configuration conf = new Configuration();
|
||||
File dnDataDir = new File(
|
||||
new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data");
|
||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString());
|
||||
Mockito.doReturn(conf).when(mockDn).getConf();
|
||||
Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf();
|
||||
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
|
||||
when(mockDn).getMetrics();
|
||||
final AtomicInteger count = new AtomicInteger();
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
if (count.getAndIncrement() == 0) {
|
||||
throw new IOException("faked initBlockPool exception");
|
||||
}
|
||||
// The initBlockPool is called again. Now mock init is done.
|
||||
Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
|
||||
return null;
|
||||
}
|
||||
}).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
|
||||
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
|
||||
bpos.start();
|
||||
try {
|
||||
waitForInitialization(bpos);
|
||||
List<BPServiceActor> actors = bpos.getBPServiceActors();
|
||||
assertEquals(1, actors.size());
|
||||
BPServiceActor actor = actors.get(0);
|
||||
waitForBlockReport(actor.getNameNodeProxy());
|
||||
} finally {
|
||||
bpos.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForOneToFail(final BPOfferService bpos)
|
||||
throws Exception {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@ -311,6 +354,11 @@ public class TestBPOfferService {
|
||||
*/
|
||||
private BPOfferService setupBPOSForNNs(
|
||||
DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
|
||||
return setupBPOSForNNs(mockDn, nns);
|
||||
}
|
||||
|
||||
private BPOfferService setupBPOSForNNs(DataNode mockDn,
|
||||
DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
|
||||
// Set up some fake InetAddresses, then override the connectToNN
|
||||
// function to return the corresponding proxies.
|
||||
|
||||
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
@ -181,6 +182,16 @@ public class NameNodeAdapter {
|
||||
return spy;
|
||||
}
|
||||
|
||||
public static FSEditLog spyOnEditLog(NameNode nn) {
|
||||
FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
|
||||
nn.getFSImage().setEditLogForTesting(spyEditLog);
|
||||
EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
|
||||
if (tailer != null) {
|
||||
tailer.setEditLog(spyEditLog);
|
||||
}
|
||||
return spyEditLog;
|
||||
}
|
||||
|
||||
public static JournalSet spyOnJournalSet(NameNode nn) {
|
||||
FSEditLog editLog = nn.getFSImage().getEditLog();
|
||||
JournalSet js = Mockito.spy(editLog.getJournalSet());
|
||||
|
@ -82,7 +82,7 @@ public class TestFileJournalManager {
|
||||
final PriorityQueue<EditLogInputStream> allStreams =
|
||||
new PriorityQueue<EditLogInputStream>(64,
|
||||
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||
jm.selectInputStreams(allStreams, fromTxId, inProgressOk, true);
|
||||
jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
|
||||
EditLogInputStream elis = null;
|
||||
try {
|
||||
while ((elis = allStreams.poll()) != null) {
|
||||
@ -378,14 +378,8 @@ public class TestFileJournalManager {
|
||||
FileJournalManager fjm = new FileJournalManager(conf, sd, null);
|
||||
assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1));
|
||||
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101));
|
||||
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 150));
|
||||
assertEquals("[1001,1100]", getLogsAsString(fjm, 201));
|
||||
try {
|
||||
assertEquals("[]", getLogsAsString(fjm, 150));
|
||||
fail("Did not throw when asking for a txn in the middle of a log");
|
||||
} catch (IllegalStateException ioe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"150 which is in the middle", ioe);
|
||||
}
|
||||
assertEquals("Asking for a newer log than exists should return empty list",
|
||||
"", getLogsAsString(fjm, 9999));
|
||||
}
|
||||
@ -404,7 +398,7 @@ public class TestFileJournalManager {
|
||||
final PriorityQueue<EditLogInputStream> allStreams =
|
||||
new PriorityQueue<EditLogInputStream>(64,
|
||||
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||
jm.selectInputStreams(allStreams, txId, inProgressOk, true);
|
||||
jm.selectInputStreams(allStreams, txId, inProgressOk);
|
||||
EditLogInputStream elis = null, ret;
|
||||
try {
|
||||
while ((elis = allStreams.poll()) != null) {
|
||||
@ -482,6 +476,6 @@ public class TestFileJournalManager {
|
||||
|
||||
private static String getLogsAsString(
|
||||
FileJournalManager fjm, long firstTxId) throws IOException {
|
||||
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, true, false));
|
||||
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, false));
|
||||
}
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ public class TestGenericJournalConf {
|
||||
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxnId, boolean inProgressOk, boolean forReading) {
|
||||
long fromTxnId, boolean inProgressOk) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -324,7 +324,7 @@ public class TestNNStorageRetentionManager {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public FSEditLog mockEditLog(StoragePurger purger) {
|
||||
public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
|
||||
final List<JournalManager> jms = Lists.newArrayList();
|
||||
final JournalSet journalSet = new JournalSet(0);
|
||||
for (FakeRoot root : dirRoots.values()) {
|
||||
@ -360,12 +360,11 @@ public class TestNNStorageRetentionManager {
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
Object[] args = invocation.getArguments();
|
||||
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
||||
(long)((Long)args[1]), (boolean)((Boolean)args[2]),
|
||||
(boolean)((Boolean)args[3]));
|
||||
(long)((Long)args[1]), (boolean)((Boolean)args[2]));
|
||||
return null;
|
||||
}
|
||||
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
|
||||
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
|
||||
Mockito.anyLong(), Mockito.anyBoolean());
|
||||
return mockLog;
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapsho
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -57,7 +58,7 @@ public class TestSnapshotPathINodes {
|
||||
static private DistributedFileSystem hdfs;
|
||||
|
||||
@BeforeClass
|
||||
static public void setUp() throws Exception {
|
||||
public static void setUp() throws Exception {
|
||||
conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(REPLICATION)
|
||||
@ -68,12 +69,16 @@ public class TestSnapshotPathINodes {
|
||||
fsdir = fsn.getFSDirectory();
|
||||
|
||||
hdfs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void reset() throws Exception {
|
||||
DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, seed);
|
||||
DFSTestUtil.createFile(hdfs, file2, 1024, REPLICATION, seed);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
static public void tearDown() throws Exception {
|
||||
public static void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
@ -251,6 +256,8 @@ public class TestSnapshotPathINodes {
|
||||
System.out.println("The exception is expected: " + fnfe);
|
||||
}
|
||||
}
|
||||
hdfs.deleteSnapshot(sub1, "s1");
|
||||
hdfs.disallowSnapshot(sub1);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -308,6 +315,8 @@ public class TestSnapshotPathINodes {
|
||||
sub1.toString());
|
||||
assertEquals(inodes[components.length - 3].getFullPathName(),
|
||||
dir.toString());
|
||||
hdfs.deleteSnapshot(sub1, "s2");
|
||||
hdfs.disallowSnapshot(sub1);
|
||||
}
|
||||
|
||||
static private Snapshot s4;
|
||||
@ -367,6 +376,8 @@ public class TestSnapshotPathINodes {
|
||||
sub1.toString());
|
||||
assertEquals(inodes[components.length - 3].getFullPathName(),
|
||||
dir.toString());
|
||||
hdfs.deleteSnapshot(sub1, "s4");
|
||||
hdfs.disallowSnapshot(sub1);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -375,9 +386,6 @@ public class TestSnapshotPathINodes {
|
||||
*/
|
||||
@Test (timeout=15000)
|
||||
public void testSnapshotPathINodesAfterModification() throws Exception {
|
||||
//file1 was deleted, create it again.
|
||||
DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, seed);
|
||||
|
||||
// First check the INode for /TestSnapshot/sub1/file1
|
||||
String[] names = INode.getPathNames(file1.toString());
|
||||
byte[][] components = INode.getPathComponents(names);
|
||||
@ -385,7 +393,6 @@ public class TestSnapshotPathINodes {
|
||||
INode[] inodes = nodesInPath.getINodes();
|
||||
// The number of inodes should be equal to components.length
|
||||
assertEquals(inodes.length, components.length);
|
||||
assertSnapshot(nodesInPath, false, s4, -1);
|
||||
|
||||
// The last INode should be associated with file1
|
||||
assertEquals(inodes[components.length - 1].getFullPathName(),
|
||||
@ -434,5 +441,7 @@ public class TestSnapshotPathINodes {
|
||||
assertEquals(newInodes[last].getFullPathName(), file1.toString());
|
||||
// The modification time of the INode for file3 should have been changed
|
||||
Assert.assertFalse(modTime == newInodes[last].getModificationTime());
|
||||
hdfs.deleteSnapshot(sub1, "s3");
|
||||
hdfs.disallowSnapshot(sub1);
|
||||
}
|
||||
}
|
||||
|
@ -17,24 +17,18 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -44,51 +38,23 @@ import com.google.common.collect.ImmutableList;
|
||||
/**
|
||||
* Test BootstrapStandby when QJM is used for shared edits.
|
||||
*/
|
||||
public class TestBootstrapStandbyWithQJM {
|
||||
|
||||
private static final String NAMESERVICE = "ns1";
|
||||
private static final String NN1 = "nn1";
|
||||
private static final String NN2 = "nn2";
|
||||
private static final int NUM_JN = 3;
|
||||
private static final int NN1_IPC_PORT = 10000;
|
||||
private static final int NN1_INFO_PORT = 10001;
|
||||
private static final int NN2_IPC_PORT = 10002;
|
||||
private static final int NN2_INFO_PORT = 10003;
|
||||
|
||||
public class TestBootstrapStandbyWithQJM {
|
||||
private MiniQJMHACluster miniQjmHaCluster;
|
||||
private MiniDFSCluster cluster;
|
||||
private MiniJournalCluster jCluster;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
// start 3 journal nodes
|
||||
jCluster = new MiniJournalCluster.Builder(new Configuration()).format(true)
|
||||
.numJournalNodes(NUM_JN).build();
|
||||
URI journalURI = jCluster.getQuorumJournalURI(NAMESERVICE);
|
||||
|
||||
// start cluster with 2 NameNodes
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
|
||||
new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
|
||||
.setHttpPort(NN1_INFO_PORT)).addNN(
|
||||
new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
|
||||
.setHttpPort(NN2_INFO_PORT)));
|
||||
|
||||
Configuration conf = initHAConf(journalURI);
|
||||
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
|
||||
.numDataNodes(1).manageNameDfsSharedDirs(false).build();
|
||||
cluster.waitActive();
|
||||
|
||||
Configuration confNN0 = new Configuration(conf);
|
||||
cluster.shutdown();
|
||||
// initialize the journal nodes
|
||||
confNN0.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
|
||||
NameNode.initializeSharedEdits(confNN0, true);
|
||||
|
||||
// restart the cluster
|
||||
cluster = new MiniDFSCluster.Builder(conf).format(false)
|
||||
.nnTopology(topology).numDataNodes(1).manageNameDfsSharedDirs(false)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
Configuration conf = new Configuration();
|
||||
// Turn off IPC client caching, so that the suite can handle
|
||||
// the restart of the daemons between test cases.
|
||||
conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
||||
0);
|
||||
|
||||
miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build();
|
||||
cluster = miniQjmHaCluster.getDfsCluster();
|
||||
jCluster = miniQjmHaCluster.getJournalCluster();
|
||||
|
||||
// make nn0 active
|
||||
cluster.transitionToActive(0);
|
||||
@ -109,27 +75,6 @@ public class TestBootstrapStandbyWithQJM {
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration initHAConf(URI journalURI) {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
journalURI.toString());
|
||||
|
||||
String address1 = "127.0.0.1:" + NN1_IPC_PORT;
|
||||
String address2 = "127.0.0.1:" + NN2_IPC_PORT;
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
NAMESERVICE, NN1), address1);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
NAMESERVICE, NN2), address2);
|
||||
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
|
||||
NN1 + "," + NN2);
|
||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
|
||||
ConfiguredFailoverProxyProvider.class.getName());
|
||||
conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
/** BootstrapStandby when the existing NN is standby */
|
||||
@Test
|
||||
public void testBootstrapStandbyWithStandbyNN() throws Exception {
|
||||
|
@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
|
||||
@ -37,6 +38,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
|
||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||
@ -48,23 +51,49 @@ import org.apache.hadoop.util.ExitUtil.ExitException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestFailureToReadEdits {
|
||||
|
||||
private static final String TEST_DIR1 = "/test1";
|
||||
private static final String TEST_DIR2 = "/test2";
|
||||
private static final String TEST_DIR3 = "/test3";
|
||||
|
||||
private final TestType clusterType;
|
||||
private Configuration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
private MiniQJMHACluster miniQjmHaCluster; // for QJM case only
|
||||
private NameNode nn0;
|
||||
private NameNode nn1;
|
||||
private FileSystem fs;
|
||||
|
||||
private static enum TestType {
|
||||
SHARED_DIR_HA,
|
||||
QJM_HA;
|
||||
};
|
||||
|
||||
/**
|
||||
* Run this suite of tests both for QJM-based HA and for file-based
|
||||
* HA.
|
||||
*/
|
||||
@Parameters
|
||||
public static Iterable<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{ TestType.SHARED_DIR_HA },
|
||||
{ TestType.QJM_HA } });
|
||||
}
|
||||
|
||||
public TestFailureToReadEdits(TestType clusterType) {
|
||||
this.clusterType = clusterType;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpCluster() throws Exception {
|
||||
conf = new Configuration();
|
||||
@ -74,16 +103,19 @@ public class TestFailureToReadEdits {
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10041))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10042)));
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(topology)
|
||||
.numDataNodes(0)
|
||||
.checkExitOnShutdown(false)
|
||||
.build();
|
||||
|
||||
if (clusterType == TestType.SHARED_DIR_HA) {
|
||||
MiniDFSNNTopology topology = MiniQJMHACluster.createDefaultTopology();
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(topology)
|
||||
.numDataNodes(0)
|
||||
.checkExitOnShutdown(false)
|
||||
.build();
|
||||
} else {
|
||||
Builder builder = new MiniQJMHACluster.Builder(conf);
|
||||
builder.getDfsBuilder().numDataNodes(0).checkExitOnShutdown(false);
|
||||
miniQjmHaCluster = builder.build();
|
||||
cluster = miniQjmHaCluster.getDfsCluster();
|
||||
}
|
||||
cluster.waitActive();
|
||||
|
||||
nn0 = cluster.getNameNode(0);
|
||||
@ -99,8 +131,14 @@ public class TestFailureToReadEdits {
|
||||
fs.close();
|
||||
}
|
||||
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
if (clusterType == TestType.SHARED_DIR_HA) {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
} else {
|
||||
if (miniQjmHaCluster != null) {
|
||||
miniQjmHaCluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -259,13 +297,10 @@ public class TestFailureToReadEdits {
|
||||
}
|
||||
|
||||
private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
|
||||
FSEditLog spyEditLog = spy(nn1.getNamesystem().getEditLogTailer()
|
||||
.getEditLog());
|
||||
FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1);
|
||||
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
||||
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
||||
anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean());
|
||||
nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
|
||||
|
||||
return answer;
|
||||
}
|
||||
|
||||
|
@ -154,6 +154,14 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1378. Implemented a cleaner of old finished applications from the RM
|
||||
state-store. (Jian He via vinodkv)
|
||||
|
||||
YARN-1481. Move internal services logic from AdminService to ResourceManager.
|
||||
(vinodkv via kasha)
|
||||
|
||||
YARN-1491. Upgrade JUnit3 TestCase to JUnit 4 (Chen He via jeagles)
|
||||
|
||||
YARN-408. Change CapacityScheduler to not disable delay-scheduling by default.
|
||||
(Mayank Bansal via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -23,16 +23,16 @@ import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* A JUnit test to test {@link LinuxResourceCalculatorPlugin}
|
||||
* Create the fake /proc/ information and verify the parsing and calculation
|
||||
*/
|
||||
public class TestLinuxResourceCalculatorPlugin extends TestCase {
|
||||
public class TestLinuxResourceCalculatorPlugin {
|
||||
/**
|
||||
* LinuxResourceCalculatorPlugin with a fake timer
|
||||
*/
|
||||
@ -145,7 +145,7 @@ public class TestLinuxResourceCalculatorPlugin extends TestCase {
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testParsingProcStatAndCpuFile() throws IOException {
|
||||
public void parsingProcStatAndCpuFile() throws IOException {
|
||||
// Write fake /proc/cpuinfo file.
|
||||
long numProcessors = 8;
|
||||
long cpuFrequencyKHz = 2392781;
|
||||
@ -171,7 +171,7 @@ public class TestLinuxResourceCalculatorPlugin extends TestCase {
|
||||
updateStatFile(uTime, nTime, sTime);
|
||||
assertEquals(plugin.getCumulativeCpuTime(),
|
||||
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
|
||||
assertEquals(plugin.getCpuUsage(), (float)(LinuxResourceCalculatorPlugin.UNAVAILABLE));
|
||||
assertEquals(plugin.getCpuUsage(), (float)(LinuxResourceCalculatorPlugin.UNAVAILABLE),0.0);
|
||||
|
||||
// Advance the time and sample again to test the CPU usage calculation
|
||||
uTime += 100L;
|
||||
@ -179,13 +179,13 @@ public class TestLinuxResourceCalculatorPlugin extends TestCase {
|
||||
updateStatFile(uTime, nTime, sTime);
|
||||
assertEquals(plugin.getCumulativeCpuTime(),
|
||||
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
|
||||
assertEquals(plugin.getCpuUsage(), 6.25F);
|
||||
assertEquals(plugin.getCpuUsage(), 6.25F, 0.0);
|
||||
|
||||
// Advance the time and sample again. This time, we call getCpuUsage() only.
|
||||
uTime += 600L;
|
||||
plugin.advanceTime(300L);
|
||||
updateStatFile(uTime, nTime, sTime);
|
||||
assertEquals(plugin.getCpuUsage(), 25F);
|
||||
assertEquals(plugin.getCpuUsage(), 25F, 0.0);
|
||||
|
||||
// Advance very short period of time (one jiffy length).
|
||||
// In this case, CPU usage should not be updated.
|
||||
@ -194,7 +194,7 @@ public class TestLinuxResourceCalculatorPlugin extends TestCase {
|
||||
updateStatFile(uTime, nTime, sTime);
|
||||
assertEquals(plugin.getCumulativeCpuTime(),
|
||||
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
|
||||
assertEquals(plugin.getCpuUsage(), 25F); // CPU usage is not updated.
|
||||
assertEquals(plugin.getCpuUsage(), 25F, 0.0); // CPU usage is not updated.
|
||||
}
|
||||
|
||||
/**
|
||||
@ -212,7 +212,7 @@ public class TestLinuxResourceCalculatorPlugin extends TestCase {
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testParsingProcMemFile() throws IOException {
|
||||
public void parsingProcMemFile() throws IOException {
|
||||
long memTotal = 4058864L;
|
||||
long memFree = 99632L;
|
||||
long inactive = 567732L;
|
||||
|
@ -22,10 +22,10 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestWindowsBasedProcessTree extends TestCase {
|
||||
public class TestWindowsBasedProcessTree {
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TestWindowsBasedProcessTree.class);
|
||||
|
||||
@ -41,7 +41,7 @@ public class TestWindowsBasedProcessTree extends TestCase {
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testTree() {
|
||||
public void tree() {
|
||||
if( !Shell.WINDOWS) {
|
||||
LOG.info("Platform not Windows. Not testing");
|
||||
return;
|
||||
|
@ -18,10 +18,10 @@
|
||||
|
||||
package org.apache.hadoop.yarn.util;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestWindowsResourceCalculatorPlugin extends TestCase {
|
||||
public class TestWindowsResourceCalculatorPlugin {
|
||||
|
||||
|
||||
class WindowsResourceCalculatorPluginTester extends WindowsResourceCalculatorPlugin {
|
||||
@ -33,7 +33,7 @@ public class TestWindowsResourceCalculatorPlugin extends TestCase {
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testParseSystemInfoString() {
|
||||
public void parseSystemInfoString() {
|
||||
WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester();
|
||||
// info str derived from windows shell command has \r\n termination
|
||||
tester.infoStr = "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n";
|
||||
@ -51,7 +51,7 @@ public class TestWindowsResourceCalculatorPlugin extends TestCase {
|
||||
}
|
||||
|
||||
@Test (timeout = 20000)
|
||||
public void testRefreshAndCpuUsage() throws InterruptedException {
|
||||
public void refreshAndCpuUsage() throws InterruptedException {
|
||||
WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester();
|
||||
// info str derived from windows shell command has \r\n termination
|
||||
tester.infoStr = "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n";
|
||||
@ -75,7 +75,7 @@ public class TestWindowsResourceCalculatorPlugin extends TestCase {
|
||||
}
|
||||
|
||||
@Test (timeout = 20000)
|
||||
public void testErrorInGetSystemInfo() {
|
||||
public void errorInGetSystemInfo() {
|
||||
WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester();
|
||||
// info str derived from windows shell command has \r\n termination
|
||||
tester.infoStr = null;
|
||||
|
@ -18,23 +18,23 @@
|
||||
|
||||
package org.apache.hadoop.yarn.util;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
/**
|
||||
* A JUnit test to test {@link YarnVersionInfo}
|
||||
*/
|
||||
public class TestYarnVersionInfo extends TestCase {
|
||||
public class TestYarnVersionInfo {
|
||||
|
||||
/**
|
||||
* Test the yarn version info routines.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testVersionInfoGenerated() throws IOException {
|
||||
public void versionInfoGenerated() throws IOException {
|
||||
|
||||
// can't easily know what the correct values are going to be so just
|
||||
// make sure they aren't Unknown
|
||||
|
@ -99,12 +99,12 @@
|
||||
|
||||
<property>
|
||||
<name>yarn.scheduler.capacity.node-locality-delay</name>
|
||||
<value>-1</value>
|
||||
<value>40</value>
|
||||
<description>
|
||||
Number of missed scheduling opportunities after which the CapacityScheduler
|
||||
attempts to schedule rack-local containers.
|
||||
Typically this should be set to number of racks in the cluster, this
|
||||
feature is disabled by default, set to -1.
|
||||
Typically this should be set to number of nodes in the cluster, By default is setting
|
||||
approximately number of nodes in one rack which is 40.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -43,7 +41,6 @@ import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
@ -66,6 +63,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
public class AdminService extends AbstractService implements
|
||||
HAServiceProtocol, ResourceManagerAdministrationProtocol {
|
||||
|
||||
@ -73,10 +72,6 @@ public class AdminService extends AbstractService implements
|
||||
|
||||
private final RMContext rmContext;
|
||||
private final ResourceManager rm;
|
||||
@VisibleForTesting
|
||||
protected HAServiceProtocol.HAServiceState
|
||||
haState = HAServiceProtocol.HAServiceState.INITIALIZING;
|
||||
boolean haEnabled;
|
||||
|
||||
private Server server;
|
||||
private InetSocketAddress masterServiceAddress;
|
||||
@ -93,13 +88,6 @@ public class AdminService extends AbstractService implements
|
||||
|
||||
@Override
|
||||
public synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
haEnabled = HAUtil.isHAEnabled(conf);
|
||||
if (haEnabled) {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
rm.setConf(conf);
|
||||
}
|
||||
rm.createAndInitActiveServices();
|
||||
|
||||
masterServiceAddress = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||
@ -112,11 +100,6 @@ public class AdminService extends AbstractService implements
|
||||
|
||||
@Override
|
||||
protected synchronized void serviceStart() throws Exception {
|
||||
if (haEnabled) {
|
||||
transitionToStandby(true);
|
||||
} else {
|
||||
transitionToActive();
|
||||
}
|
||||
startServer();
|
||||
super.serviceStart();
|
||||
}
|
||||
@ -124,8 +107,6 @@ public class AdminService extends AbstractService implements
|
||||
@Override
|
||||
protected synchronized void serviceStop() throws Exception {
|
||||
stopServer();
|
||||
transitionToStandby(false);
|
||||
haState = HAServiceState.STOPPING;
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@ -145,7 +126,7 @@ public class AdminService extends AbstractService implements
|
||||
refreshServiceAcls(conf, new RMPolicyProvider());
|
||||
}
|
||||
|
||||
if (haEnabled) {
|
||||
if (rmContext.isHAEnabled()) {
|
||||
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
|
||||
@ -182,39 +163,27 @@ public class AdminService extends AbstractService implements
|
||||
}
|
||||
|
||||
private synchronized boolean isRMActive() {
|
||||
return HAServiceState.ACTIVE == haState;
|
||||
return HAServiceState.ACTIVE == rmContext.getHAServiceState();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void monitorHealth()
|
||||
throws IOException {
|
||||
checkAccess("monitorHealth");
|
||||
if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
|
||||
if (isRMActive() && !rm.areActiveServicesRunning()) {
|
||||
throw new HealthCheckFailedException(
|
||||
"Active ResourceManager services are not running!");
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void transitionToActive() throws Exception {
|
||||
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
LOG.info("Already in active state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to active");
|
||||
rm.startActiveServices();
|
||||
haState = HAServiceProtocol.HAServiceState.ACTIVE;
|
||||
LOG.info("Transitioned to active");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
public synchronized void transitionToActive(
|
||||
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
|
||||
UserGroupInformation user = checkAccess("transitionToActive");
|
||||
// TODO (YARN-1177): When automatic failover is enabled,
|
||||
// check if transition should be allowed for this request
|
||||
try {
|
||||
transitionToActive();
|
||||
rm.transitionToActive();
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"transitionToActive", "RMHAProtocolService");
|
||||
} catch (Exception e) {
|
||||
@ -226,32 +195,14 @@ public class AdminService extends AbstractService implements
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void transitionToStandby(boolean initialize)
|
||||
throws Exception {
|
||||
if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
LOG.info("Already in standby state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to standby");
|
||||
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
rm.stopActiveServices();
|
||||
if (initialize) {
|
||||
rm.createAndInitActiveServices();
|
||||
}
|
||||
}
|
||||
haState = HAServiceProtocol.HAServiceState.STANDBY;
|
||||
LOG.info("Transitioned to standby");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
public synchronized void transitionToStandby(
|
||||
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
|
||||
UserGroupInformation user = checkAccess("transitionToStandby");
|
||||
// TODO (YARN-1177): When automatic failover is enabled,
|
||||
// check if transition should be allowed for this request
|
||||
try {
|
||||
transitionToStandby(true);
|
||||
rm.transitionToStandby(true);
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"transitionToStandby", "RMHAProtocolService");
|
||||
} catch (Exception e) {
|
||||
@ -266,15 +217,15 @@ public class AdminService extends AbstractService implements
|
||||
@Override
|
||||
public synchronized HAServiceStatus getServiceStatus() throws IOException {
|
||||
checkAccess("getServiceState");
|
||||
HAServiceState haState = rmContext.getHAServiceState();
|
||||
HAServiceStatus ret = new HAServiceStatus(haState);
|
||||
if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
|
||||
HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
ret.setReadyToBecomeActive();
|
||||
} else {
|
||||
ret.setNotReadyToBecomeActive("State is " + haState);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
@ -42,7 +43,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenS
|
||||
public interface RMContext {
|
||||
|
||||
Dispatcher getDispatcher();
|
||||
|
||||
|
||||
boolean isHAEnabled();
|
||||
|
||||
HAServiceState getHAServiceState();
|
||||
|
||||
RMStateStore getStateStore();
|
||||
|
||||
ConcurrentMap<ApplicationId, RMApp> getRMApps();
|
||||
|
@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
@ -35,8 +37,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -54,6 +56,10 @@ public class RMContextImpl implements RMContext {
|
||||
private final ConcurrentMap<String, RMNode> inactiveNodes
|
||||
= new ConcurrentHashMap<String, RMNode>();
|
||||
|
||||
private boolean isHAEnabled;
|
||||
private HAServiceState haServiceState =
|
||||
HAServiceProtocol.HAServiceState.INITIALIZING;
|
||||
|
||||
private AMLivelinessMonitor amLivelinessMonitor;
|
||||
private AMLivelinessMonitor amFinishingMonitor;
|
||||
private RMStateStore stateStore = null;
|
||||
@ -211,6 +217,16 @@ public class RMContextImpl implements RMContext {
|
||||
return resourceTrackerService;
|
||||
}
|
||||
|
||||
void setHAEnabled(boolean isHAEnabled) {
|
||||
this.isHAEnabled = isHAEnabled;
|
||||
}
|
||||
|
||||
void setHAServiceState(HAServiceState haServiceState) {
|
||||
synchronized (haServiceState) {
|
||||
this.haServiceState = haServiceState;
|
||||
}
|
||||
}
|
||||
|
||||
void setDispatcher(Dispatcher dispatcher) {
|
||||
this.rmDispatcher = dispatcher;
|
||||
}
|
||||
@ -290,4 +306,16 @@ public class RMContextImpl implements RMContext {
|
||||
ResourceTrackerService resourceTrackerService) {
|
||||
this.resourceTrackerService = resourceTrackerService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHAEnabled() {
|
||||
return isHAEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HAServiceState getHAServiceState() {
|
||||
synchronized (haServiceState) {
|
||||
return haServiceState;
|
||||
}
|
||||
}
|
||||
}
|
@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.http.HttpConfig.Policy;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
@ -188,6 +191,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
addService(adminService);
|
||||
rmContext.setRMAdminService(adminService);
|
||||
|
||||
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf));
|
||||
if (this.rmContext.isHAEnabled()) {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
}
|
||||
createAndInitActiveServices();
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@ -217,9 +226,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
}
|
||||
|
||||
protected RMStateStoreOperationFailedEventDispatcher
|
||||
createRMStateStoreOperationFailedEventDispatcher() {
|
||||
return new RMStateStoreOperationFailedEventDispatcher(
|
||||
rmContext.getRMAdminService());
|
||||
createRMStateStoreOperationFailedEventDispatcher() {
|
||||
return new RMStateStoreOperationFailedEventDispatcher(rmContext, this);
|
||||
}
|
||||
|
||||
protected Dispatcher createDispatcher() {
|
||||
@ -655,11 +663,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
@Private
|
||||
public static class RMStateStoreOperationFailedEventDispatcher implements
|
||||
EventHandler<RMStateStoreOperationFailedEvent> {
|
||||
private final AdminService adminService;
|
||||
|
||||
public RMStateStoreOperationFailedEventDispatcher(
|
||||
AdminService adminService) {
|
||||
this.adminService = adminService;
|
||||
private final RMContext rmContext;
|
||||
private final ResourceManager rm;
|
||||
|
||||
public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext,
|
||||
ResourceManager resourceManager) {
|
||||
this.rmContext = rmContext;
|
||||
this.rm = resourceManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -671,16 +682,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
}
|
||||
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
|
||||
LOG.info("RMStateStore has been fenced");
|
||||
synchronized(adminService) {
|
||||
if (adminService.haEnabled) {
|
||||
try {
|
||||
// Transition to standby and reinit active services
|
||||
LOG.info("Transitioning RM to Standby mode");
|
||||
adminService.transitionToStandby(true);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to transition RM to Standby mode.");
|
||||
}
|
||||
if (rmContext.isHAEnabled()) {
|
||||
try {
|
||||
// Transition to standby and reinit active services
|
||||
LOG.info("Transitioning RM to Standby mode");
|
||||
rm.transitionToStandby(true);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to transition RM to Standby mode.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -826,10 +835,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
webApp = builder.start(new RMWebApp(this));
|
||||
}
|
||||
|
||||
void setConf(Configuration configuration) {
|
||||
conf = configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create and init {@link #activeServices}. This creates an
|
||||
* instance of {@link RMActiveServices} and initializes it.
|
||||
@ -870,6 +875,39 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
return activeServices != null && activeServices.isInState(STATE.STARTED);
|
||||
}
|
||||
|
||||
synchronized void transitionToActive() throws Exception {
|
||||
if (rmContext.getHAServiceState() ==
|
||||
HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
LOG.info("Already in active state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to active state");
|
||||
startActiveServices();
|
||||
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
|
||||
LOG.info("Transitioned to active state");
|
||||
}
|
||||
|
||||
synchronized void transitionToStandby(boolean initialize)
|
||||
throws Exception {
|
||||
if (rmContext.getHAServiceState() ==
|
||||
HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
LOG.info("Already in standby state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to standby state");
|
||||
if (rmContext.getHAServiceState() ==
|
||||
HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
stopActiveServices();
|
||||
if (initialize) {
|
||||
createAndInitActiveServices();
|
||||
}
|
||||
}
|
||||
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
|
||||
LOG.info("Transitioned to standby state");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
try {
|
||||
@ -877,6 +915,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
} catch(IOException ie) {
|
||||
throw new YarnRuntimeException("Failed to login", ie);
|
||||
}
|
||||
|
||||
if (this.rmContext.isHAEnabled()) {
|
||||
transitionToStandby(true);
|
||||
} else {
|
||||
transitionToActive();
|
||||
}
|
||||
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@ -888,6 +933,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
super.serviceStop();
|
||||
transitionToStandby(false);
|
||||
rmContext.setHAServiceState(HAServiceState.STOPPING);
|
||||
}
|
||||
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
|
@ -22,6 +22,8 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import javax.security.auth.login.Configuration;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -88,7 +90,9 @@ public class TestRM {
|
||||
public void testAppOnMultiNode() throws Exception {
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
MockRM rm = new MockRM();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set("yarn.scheduler.capacity.node-locality-delay", "-1");
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
MockNM nm2 = rm.registerNode("h2:5678", 10240);
|
||||
|
@ -1066,6 +1066,9 @@ public class TestLeafQueue {
|
||||
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
|
||||
|
||||
// node_1 heartbeats in and gets the DEFAULT_RACK request for app_1
|
||||
// We do not need locality delay here
|
||||
doReturn(-1).when(a).getNodeLocalityDelay();
|
||||
|
||||
a.assignContainers(clusterResource, node_1);
|
||||
assertEquals(10*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
@ -1649,7 +1652,7 @@ public class TestLeafQueue {
|
||||
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
|
||||
|
||||
// before reinitialization
|
||||
assertEquals(0, e.getNodeLocalityDelay());
|
||||
assertEquals(40, e.getNodeLocalityDelay());
|
||||
|
||||
csConf.setInt(CapacitySchedulerConfiguration
|
||||
.NODE_LOCALITY_DELAY, 60);
|
||||
@ -1932,10 +1935,10 @@ public class TestLeafQueue {
|
||||
|
||||
// Now, should allocate since RR(rack_1) = relax: true
|
||||
a.assignContainers(clusterResource, node_1_1);
|
||||
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
|
||||
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
assertEquals(0, app_0.getSchedulingOpportunities(priority));
|
||||
assertEquals(0, app_0.getTotalRequiredResources(priority));
|
||||
assertEquals(1, app_0.getTotalRequiredResources(priority));
|
||||
|
||||
// Now sanity-check node_local
|
||||
app_0_requests_0.add(
|
||||
|
Loading…
x
Reference in New Issue
Block a user