diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c1f430a008..4500097b8c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -45,6 +45,9 @@ Release 2.5.0 - UNRELEASED YARN-1365. Changed ApplicationMasterService to allow an app to re-register after RM restart. (Anubhav Dhoot via jianhe) + YARN-2052. Embedded an epoch number in container id to ensure the uniqueness + of container id after RM restarts. (Tsuyoshi OZAWA via jianhe) + IMPROVEMENTS YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index db86d394e9..2eb6148750 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -135,6 +135,10 @@ message RMStateVersionProto { optional int32 minor_version = 2; } +message EpochProto { + optional int64 epoch = 1; +} + ////////////////////////////////////////////////////////////////// ///////////// RM Failover related records //////////////////////// ////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 517e680252..01d506444f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -101,4 +101,6 @@ void setRMApplicationHistoryWriter( ConfigurationProvider getConfigurationProvider(); boolean isWorkPreservingRecoveryEnabled(); + + int getEpoch(); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1abc660a82..f72ef30c01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -82,6 +82,7 @@ public class RMContextImpl implements RMContext { private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private ConfigurationProvider configurationProvider; + private int epoch; /** * Default constructor. To be used in conjunction with setter methods for @@ -359,4 +360,13 @@ public void setConfigurationProvider( ConfigurationProvider configurationProvider) { this.configurationProvider = configurationProvider; } + + @Override + public int getEpoch() { + return this.epoch; + } + + void setEpoch(int epoch) { + this.epoch = epoch; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 77de2090a0..c921ae9f2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -482,6 +482,9 @@ protected void serviceStart() throws Exception { if(recoveryEnabled) { try { rmStore.checkVersion(); + if (rmContext.isWorkPreservingRecoveryEnabled()) { + rmContext.setEpoch(rmStore.getAndIncrementEpoch()); + } RMState state = rmStore.loadState(); recover(state); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 37f08cf17b..b315a84859 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -43,15 +43,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -71,7 +75,7 @@ public class FileSystemRMStateStore extends RMStateStore { protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion - .newInstance(1, 0); + .newInstance(1, 1); protected FileSystem fs; @@ -145,7 +149,30 @@ protected synchronized void storeVersion() throws Exception { writeFile(versionNodePath, data); } } - + + @Override + public synchronized int getAndIncrementEpoch() throws Exception { + Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); + int currentEpoch = 0; + if (fs.exists(epochNodePath)) { + // load current epoch + FileStatus status = fs.getFileStatus(epochNodePath); + byte[] data = readFile(epochNodePath, status.getLen()); + Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); + currentEpoch = epoch.getEpoch(); + // increment epoch and store it + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + updateFile(epochNodePath, storeData); + } else { + // initialize epoch file with 1 for the next time. + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + writeFile(epochNodePath, storeData); + } + return currentEpoch; + } + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index fb0ce1a5b4..6b5b602381 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -43,6 +43,8 @@ public class MemoryRMStateStore extends RMStateStore { RMState state = new RMState(); + private int epoch = 0; + @VisibleForTesting public RMState getState() { return state; @@ -52,6 +54,13 @@ public RMState getState() { public void checkVersion() throws Exception { } + @Override + public synchronized int getAndIncrementEpoch() throws Exception { + int currentEpoch = epoch; + epoch = epoch + 1; + return currentEpoch; + } + @Override public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 6a0426c0e8..603d020f55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -47,6 +47,11 @@ protected void closeInternal() throws Exception { // Do nothing } + @Override + public synchronized int getAndIncrementEpoch() throws Exception { + return 0; + } + @Override public RMState loadState() throws Exception { throw new UnsupportedOperationException("Cannot load state from null store"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index b18a8748b9..9b05ea1fdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; @@ -85,6 +86,7 @@ public abstract class RMStateStore extends AbstractService { protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = "RMDTSequenceNumber_"; protected static final String VERSION_NODE = "RMVersionNode"; + protected static final String EPOCH_NODE = "EpochNode"; public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -520,6 +522,12 @@ public void checkVersion() throws Exception { */ protected abstract RMStateVersion getCurrentVersion(); + + /** + * Get the current epoch of RM and increment the value. + */ + public abstract int getAndIncrementEpoch() throws Exception; + /** * Blocking API * The derived class must recover state from the store and return a new diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 9ff128d790..01bca39ad0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -44,16 +44,21 @@ import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; @@ -81,7 +86,7 @@ public class ZKRMStateStore extends RMStateStore { protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion - .newInstance(1, 0); + .newInstance(1, 1); private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = @@ -102,6 +107,7 @@ public class ZKRMStateStore extends RMStateStore { * * ROOT_DIR_PATH * |--- VERSION_INFO + * |--- EPOCH_NODE * |--- RM_ZK_FENCING_LOCK * |--- RM_APP_ROOT * | |----- (#ApplicationId1) @@ -391,6 +397,28 @@ protected synchronized RMStateVersion loadVersion() throws Exception { return null; } + @Override + public synchronized int getAndIncrementEpoch() throws Exception { + String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); + int currentEpoch = 0; + if (existsWithRetries(epochNodePath, true) != null) { + // load current epoch + byte[] data = getDataWithRetries(epochNodePath, true); + Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); + currentEpoch = epoch.getEpoch(); + // increment epoch and store it + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + setDataWithRetries(epochNodePath, storeData, -1); + } else { + // initialize epoch node with 1 for the next time. + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT); + } + return currentEpoch; + } + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/Epoch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/Epoch.java new file mode 100644 index 0000000000..066878918d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/Epoch.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; +import org.apache.hadoop.yarn.util.Records; + +/** + * The epoch information of RM for work-preserving restart. + * Epoch is incremented each time RM restart. It's used for assuring + * uniqueness of ContainerId. + */ +@Private +@Unstable +public abstract class Epoch { + + public static Epoch newInstance(int sequenceNumber) { + Epoch epoch = Records.newRecord(Epoch.class); + epoch.setEpoch(sequenceNumber); + return epoch; + } + + public abstract int getEpoch(); + + public abstract void setEpoch(int sequenceNumber); + + public abstract EpochProto getProto(); + + public String toString() { + return String.valueOf(getEpoch()); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getEpoch(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Epoch other = (Epoch) obj; + if (this.getEpoch() == other.getEpoch()) { + return true; + } else { + return false; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/EpochPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/EpochPBImpl.java new file mode 100644 index 0000000000..4430672d07 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/EpochPBImpl.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProtoOrBuilder; + + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; + +public class EpochPBImpl extends Epoch { + + EpochProto proto = EpochProto.getDefaultInstance(); + EpochProto.Builder builder = null; + boolean viaProto = false; + + public EpochPBImpl() { + builder = EpochProto.newBuilder(); + } + + public EpochPBImpl(EpochProto proto) { + this.proto = proto; + viaProto = true; + } + + public EpochProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = EpochProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getEpoch() { + EpochProtoOrBuilder p = viaProto ? proto : builder; + return (int) (p.getEpoch() & 0xffffffff); + } + + @Override + public void setEpoch(int sequentialNumber) { + maybeInitBuilder(); + builder.setEpoch(sequentialNumber); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index d3d03fdd20..581321ca35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -57,7 +57,10 @@ public class AppSchedulingInfo { private final String queueName; Queue queue; final String user; - private final AtomicInteger containerIdCounter = new AtomicInteger(0); + // TODO making containerIdCounter long + private final AtomicInteger containerIdCounter; + private final int EPOCH_BIT_MASK = 0x3ff; + private final int EPOCH_BIT_SHIFT = 22; final Set priorities = new TreeSet( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); @@ -70,15 +73,19 @@ public class AppSchedulingInfo { /* Allocated by scheduler */ boolean pending = true; // for app metrics - + + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager) { + String user, Queue queue, ActiveUsersManager activeUsersManager, + int epoch) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; + this.containerIdCounter = new AtomicInteger( + (epoch & EPOCH_BIT_MASK) << EPOCH_BIT_SHIFT); } public ApplicationId getApplicationId() { @@ -413,9 +420,6 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo( } public synchronized void recoverContainer(RMContainer rmContainer) { - // ContainerIdCounter on recovery will be addressed in YARN-2052 - this.containerIdCounter.incrementAndGet(); - QueueMetrics metrics = queue.getMetrics(); if (pending) { // If there was any container to recover, the application was diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index cf9e96258b..3a51417cdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -106,13 +108,14 @@ public class SchedulerApplicationAttempt { public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { + Preconditions.checkNotNull("RMContext should not be null", rmContext); this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); + activeUsersManager, rmContext.getEpoch()); this.queue = queue; - - if (rmContext != null && rmContext.getRMApps() != null && + + if (rmContext.getRMApps() != null && rmContext.getRMApps() .containsKey(applicationAttemptId.getApplicationId())) { ApplicationSubmissionContext appSubmissionContext = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 89342afc97..90883ec1a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -218,7 +218,7 @@ public void testSchedulerRecovery() throws Exception { assertEquals(availableResources, schedulerAttempt.getHeadroom()); // *********** check appSchedulingInfo state *********** - assertEquals(4, schedulerAttempt.getNewContainerId()); + assertEquals((1 << 22) + 1, schedulerAttempt.getNewContainerId()); } private void checkCSQueue(MockRM rm, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 00f5f57063..49d7135590 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -495,6 +495,21 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper) Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); } } + + public void testEpoch(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + + int firstTimeEpoch = store.getAndIncrementEpoch(); + Assert.assertEquals(0, firstTimeEpoch); + + int secondTimeEpoch = store.getAndIncrementEpoch(); + Assert.assertEquals(1, secondTimeEpoch); + + int thirdTimeEpoch = store.getAndIncrementEpoch(); + Assert.assertEquals(2, thirdTimeEpoch); + } public void testAppDeletion(RMStateStoreHelper stateStoreHelper) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index da25c5beda..6ccaeaeaf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -158,6 +158,7 @@ public void testFSRMStateStore() throws Exception { .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); testCheckVersion(fsTester); + testEpoch(fsTester); testAppDeletion(fsTester); } finally { cluster.shutdown(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 284794b373..d3a5475ab3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -120,6 +120,7 @@ public void testZKRMStateStoreRealZK() throws Exception { testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); testCheckVersion(zkTester); + testEpoch(zkTester); testAppDeletion(zkTester); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index 93fd30027e..20a4aa8b6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; @@ -61,10 +62,15 @@ public void testMove() { QueueMetrics newMetrics = newQueue.getMetrics(); ApplicationAttemptId appAttId = createAppAttemptId(0, 0); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getEpoch()).thenReturn(3); SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, - user, oldQueue, oldQueue.getActiveUsersManager(), null); + user, oldQueue, oldQueue.getActiveUsersManager(), rmContext); oldMetrics.submitApp(user); + // confirm that containerId is calculated based on epoch. + assertEquals(app.getNewContainerId(), 0x00c00001); + // Resource request Resource requestedResource = Resource.newInstance(1536, 2); Priority requestedPriority = Priority.newInstance(2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java index c651cb66bd..2d5a6d4bc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.util.Clock; import org.junit.Test; @@ -59,8 +60,11 @@ public void testDelayScheduling() { double rackLocalityThreshold = .6; ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getEpoch()).thenReturn(0); FSSchedulerApp schedulerApp = - new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null); + new FSSchedulerApp(applicationAttemptId, "user1", queue , null, + rmContext); // Default level should be node-local assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( @@ -118,10 +122,12 @@ public void testDelaySchedulingForContinuousScheduling() long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds long rackLocalityDelayMs = 6 * 1000L; // 6 seconds + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getEpoch()).thenReturn(0); ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); FSSchedulerApp schedulerApp = new FSSchedulerApp(applicationAttemptId, "user1", queue, - null, null); + null, rmContext); AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class); long startTime = clock.getTime(); Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime); @@ -173,9 +179,12 @@ public void testLocalityLevelWithoutDelays() { Priority prio = Mockito.mock(Priority.class); Mockito.when(prio.getPriority()).thenReturn(1); + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getEpoch()).thenReturn(0); ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); FSSchedulerApp schedulerApp = - new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null); + new FSSchedulerApp(applicationAttemptId, "user1", queue , null, + rmContext); assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( prio, 10, -1.0, -1.0)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index c1866f01cd..cc738f5eb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.junit.Before; @@ -40,6 +41,7 @@ public class TestMaxRunningAppsEnforcer { private MaxRunningAppsEnforcer maxAppsEnforcer; private int appNum; private TestFairScheduler.MockClock clock; + private RMContext rmContext; @Before public void setup() throws Exception { @@ -59,13 +61,16 @@ public void setup() throws Exception { userMaxApps = allocConf.userMaxApps; maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler); appNum = 0; + rmContext = mock(RMContext.class); + when(rmContext.getEpoch()).thenReturn(0); } private FSSchedulerApp addApp(FSLeafQueue queue, String user) { ApplicationId appId = ApplicationId.newInstance(0l, appNum++); ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); - FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null); + FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, + rmContext); queue.addApp(app, runnable); if (runnable) { maxAppsEnforcer.trackRunnableApp(app);