YARN-2229. Changed the integer field of ContainerId to be long type. Contributed by Tsuyoshi OZAWA
This commit is contained in:
parent
78b048393a
commit
3122daa802
@ -194,7 +194,7 @@ public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
|
||||
Collections.sort(listOfCont, new Comparator<Container>() {
|
||||
@Override
|
||||
public int compare(final Container o1, final Container o2) {
|
||||
return o2.getId().getId() - o1.getId().getId();
|
||||
return o2.getId().compareTo(o1.getId());
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -211,6 +211,9 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2538. Added logs when RM sends roll-overed AMRMToken to AM. (Xuan Gong
|
||||
via zjshen)
|
||||
|
||||
YARN-2229. Changed the integer field of ContainerId to be long type.
|
||||
(Tsuyoshi OZAWA via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -41,9 +41,9 @@ public abstract class ContainerId implements Comparable<ContainerId>{
|
||||
@Private
|
||||
@Unstable
|
||||
public static ContainerId newInstance(ApplicationAttemptId appAttemptId,
|
||||
int containerId) {
|
||||
long containerId) {
|
||||
ContainerId id = Records.newRecord(ContainerId.class);
|
||||
id.setId(containerId);
|
||||
id.setContainerId(containerId);
|
||||
id.setApplicationAttemptId(appAttemptId);
|
||||
id.build();
|
||||
return id;
|
||||
@ -74,16 +74,28 @@ public static ContainerId newInstance(ApplicationAttemptId appAttemptId,
|
||||
protected abstract void setApplicationAttemptId(ApplicationAttemptId atId);
|
||||
|
||||
/**
|
||||
* Get the identifier of the <code>ContainerId</code>.
|
||||
* @return identifier of the <code>ContainerId</code>
|
||||
* Get the lower 32 bits of identifier of the <code>ContainerId</code>,
|
||||
* which doesn't include epoch. Note that this method will be marked as
|
||||
* deprecated, so please use <code>getContainerId</code> instead.
|
||||
* @return lower 32 bits of identifier of the <code>ContainerId</code>
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract int getId();
|
||||
|
||||
/**
|
||||
* Get the identifier of the <code>ContainerId</code>. Upper 24 bits are
|
||||
* reserved as epoch of cluster, and lower 40 bits are reserved as
|
||||
* sequential number of containers.
|
||||
* @return identifier of the <code>ContainerId</code>
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getContainerId();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
protected abstract void setId(int id);
|
||||
protected abstract void setContainerId(long id);
|
||||
|
||||
|
||||
// TODO: fail the app submission if attempts are more than 10 or something
|
||||
@ -112,11 +124,9 @@ public NumberFormat initialValue() {
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
// Generated by eclipse.
|
||||
final int prime = 435569;
|
||||
int result = 7507;
|
||||
result = prime * result + getId();
|
||||
result = prime * result + getApplicationAttemptId().hashCode();
|
||||
// Generated by IntelliJ IDEA 13.1.
|
||||
int result = (int) (getContainerId() ^ (getContainerId() >>> 32));
|
||||
result = 31 * result + getApplicationAttemptId().hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -131,7 +141,7 @@ public boolean equals(Object obj) {
|
||||
ContainerId other = (ContainerId) obj;
|
||||
if (!this.getApplicationAttemptId().equals(other.getApplicationAttemptId()))
|
||||
return false;
|
||||
if (this.getId() != other.getId())
|
||||
if (this.getContainerId() != other.getContainerId())
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
@ -140,12 +150,12 @@ public boolean equals(Object obj) {
|
||||
public int compareTo(ContainerId other) {
|
||||
if (this.getApplicationAttemptId().compareTo(
|
||||
other.getApplicationAttemptId()) == 0) {
|
||||
return this.getId() - other.getId();
|
||||
return Long.valueOf(getContainerId())
|
||||
.compareTo(Long.valueOf(other.getContainerId()));
|
||||
} else {
|
||||
return this.getApplicationAttemptId().compareTo(
|
||||
other.getApplicationAttemptId());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -159,8 +169,8 @@ public String toString() {
|
||||
sb.append(
|
||||
appAttemptIdAndEpochFormat.get().format(
|
||||
getApplicationAttemptId().getAttemptId())).append("_");
|
||||
sb.append(containerIdFormat.get().format(0x3fffff & getId()));
|
||||
int epoch = getId() >> 22;
|
||||
sb.append(containerIdFormat.get().format(0xffffffffffL & getContainerId()));
|
||||
long epoch = getContainerId() >> 40;
|
||||
if (epoch > 0) {
|
||||
sb.append("_").append(appAttemptIdAndEpochFormat.get().format(epoch));
|
||||
}
|
||||
@ -177,12 +187,12 @@ public static ContainerId fromString(String containerIdStr) {
|
||||
}
|
||||
try {
|
||||
ApplicationAttemptId appAttemptID = toApplicationAttemptId(it);
|
||||
int id = Integer.parseInt(it.next());
|
||||
int epoch = 0;
|
||||
long id = Long.parseLong(it.next());
|
||||
long epoch = 0;
|
||||
if (it.hasNext()) {
|
||||
epoch = Integer.parseInt(it.next());
|
||||
}
|
||||
int cid = (epoch << 22) | id;
|
||||
long cid = (epoch << 40) | id;
|
||||
ContainerId containerId = ContainerId.newInstance(appAttemptID, cid);
|
||||
return containerId;
|
||||
} catch (NumberFormatException n) {
|
||||
|
@ -50,7 +50,7 @@ message ApplicationAttemptIdProto {
|
||||
message ContainerIdProto {
|
||||
optional ApplicationIdProto app_id = 1;
|
||||
optional ApplicationAttemptIdProto app_attempt_id = 2;
|
||||
optional int32 id = 3;
|
||||
optional int64 id = 3;
|
||||
}
|
||||
|
||||
message ResourceProto {
|
||||
|
@ -49,12 +49,18 @@ public ContainerIdProto getProto() {
|
||||
|
||||
@Override
|
||||
public int getId() {
|
||||
Preconditions.checkNotNull(proto);
|
||||
return (int) proto.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getContainerId() {
|
||||
Preconditions.checkNotNull(proto);
|
||||
return proto.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setId(int id) {
|
||||
protected void setContainerId(long id) {
|
||||
Preconditions.checkNotNull(builder);
|
||||
builder.setId((id));
|
||||
}
|
||||
|
@ -54,18 +54,26 @@ public void testContainerId() {
|
||||
long ts = System.currentTimeMillis();
|
||||
ContainerId c6 = newContainerId(36473, 4365472, ts, 25645811);
|
||||
Assert.assertEquals("container_10_0001_01_000001", c1.toString());
|
||||
Assert.assertEquals(c1,
|
||||
ContainerId.fromString("container_10_0001_01_000001"));
|
||||
Assert.assertEquals(479987, 0x003fffff & c6.getId());
|
||||
Assert.assertEquals(6, c6.getId() >> 22);
|
||||
Assert.assertEquals("container_" + ts + "_36473_4365472_479987_06",
|
||||
Assert.assertEquals(25645811, 0xffffffffffL & c6.getContainerId());
|
||||
Assert.assertEquals(0, c6.getContainerId() >> 40);
|
||||
Assert.assertEquals("container_" + ts + "_36473_4365472_25645811",
|
||||
c6.toString());
|
||||
Assert.assertEquals(c6,
|
||||
ContainerId.fromString("container_" + ts + "_36473_4365472_479987_06"));
|
||||
|
||||
ContainerId c7 = newContainerId(36473, 4365472, ts, 4298334883325L);
|
||||
Assert.assertEquals(999799999997L, 0xffffffffffL & c7.getContainerId());
|
||||
Assert.assertEquals(3, c7.getContainerId() >> 40);
|
||||
Assert.assertEquals(
|
||||
"container_" + ts + "_36473_4365472_999799999997_03", c7.toString());
|
||||
|
||||
ContainerId c8 = newContainerId(36473, 4365472, ts, 844424930131965L);
|
||||
Assert.assertEquals(1099511627773L, 0xffffffffffL & c8.getContainerId());
|
||||
Assert.assertEquals(767, c8.getContainerId() >> 40);
|
||||
Assert.assertEquals(
|
||||
"container_" + ts + "_36473_4365472_1099511627773_767", c8.toString());
|
||||
}
|
||||
|
||||
public static ContainerId newContainerId(int appId, int appAttemptId,
|
||||
long timestamp, int containerId) {
|
||||
long timestamp, long containerId) {
|
||||
ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId);
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
ApplicationAttemptId.newInstance(applicationId, appAttemptId);
|
||||
|
@ -59,9 +59,24 @@ public void testContainerId() throws URISyntaxException {
|
||||
public void testContainerIdWithEpoch() throws URISyntaxException {
|
||||
ContainerId id = TestContainerId.newContainerId(0, 0, 0, 25645811);
|
||||
String cid = ConverterUtils.toString(id);
|
||||
assertEquals("container_0_0000_00_479987_06", cid);
|
||||
assertEquals("container_0_0000_00_25645811", cid);
|
||||
ContainerId gen = ConverterUtils.toContainerId(cid);
|
||||
assertEquals(gen.toString(), id.toString());
|
||||
|
||||
long ts = System.currentTimeMillis();
|
||||
ContainerId id2 =
|
||||
TestContainerId.newContainerId(36473, 4365472, ts, 4298334883325L);
|
||||
String cid2 = ConverterUtils.toString(id2);
|
||||
assertEquals("container_" + ts + "_36473_4365472_999799999997_03", cid2);
|
||||
ContainerId gen2 = ConverterUtils.toContainerId(cid2);
|
||||
assertEquals(gen2.toString(), id2.toString());
|
||||
|
||||
ContainerId id3 =
|
||||
TestContainerId.newContainerId(36473, 4365472, ts, 844424930131965L);
|
||||
String cid3 = ConverterUtils.toString(id3);
|
||||
assertEquals("container_" + ts + "_36473_4365472_1099511627773_767", cid3);
|
||||
ContainerId gen3 = ConverterUtils.toContainerId(cid3);
|
||||
assertEquals(gen3.toString(), id3.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -138,12 +138,12 @@ public static ApplicationId convert(long clustertimestamp, CharSequence id) {
|
||||
}
|
||||
|
||||
public static ContainerId newContainerId(ApplicationAttemptId appAttemptId,
|
||||
int containerId) {
|
||||
long containerId) {
|
||||
return ContainerId.newInstance(appAttemptId, containerId);
|
||||
}
|
||||
|
||||
public static ContainerId newContainerId(int appId, int appAttemptId,
|
||||
long timestamp, int id) {
|
||||
long timestamp, long id) {
|
||||
ApplicationId applicationId = newApplicationId(timestamp, appId);
|
||||
ApplicationAttemptId applicationAttemptId = newApplicationAttemptId(
|
||||
applicationId, appAttemptId);
|
||||
|
@ -107,5 +107,5 @@ void setRMApplicationHistoryWriter(
|
||||
|
||||
boolean isWorkPreservingRecoveryEnabled();
|
||||
|
||||
int getEpoch();
|
||||
long getEpoch();
|
||||
}
|
@ -84,7 +84,7 @@ public class RMContextImpl implements RMContext {
|
||||
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
|
||||
private SystemMetricsPublisher systemMetricsPublisher;
|
||||
private ConfigurationProvider configurationProvider;
|
||||
private int epoch;
|
||||
private long epoch;
|
||||
|
||||
/**
|
||||
* Default constructor. To be used in conjunction with setter methods for
|
||||
@ -375,11 +375,11 @@ public void setConfigurationProvider(
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEpoch() {
|
||||
public long getEpoch() {
|
||||
return this.epoch;
|
||||
}
|
||||
|
||||
void setEpoch(int epoch) {
|
||||
void setEpoch(long epoch) {
|
||||
this.epoch = epoch;
|
||||
}
|
||||
}
|
@ -602,8 +602,7 @@ public int compare(RMContainer a, RMContainer b) {
|
||||
if (priorityComp != 0) {
|
||||
return priorityComp;
|
||||
}
|
||||
return b.getContainerId().getId() -
|
||||
a.getContainerId().getId();
|
||||
return b.getContainerId().compareTo(a.getContainerId());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -165,9 +165,9 @@ protected synchronized void storeVersion() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getAndIncrementEpoch() throws Exception {
|
||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
|
||||
int currentEpoch = 0;
|
||||
long currentEpoch = 0;
|
||||
if (fs.exists(epochNodePath)) {
|
||||
// load current epoch
|
||||
FileStatus status = fs.getFileStatus(epochNodePath);
|
||||
|
@ -44,7 +44,7 @@
|
||||
public class MemoryRMStateStore extends RMStateStore {
|
||||
|
||||
RMState state = new RMState();
|
||||
private int epoch = 0;
|
||||
private long epoch = 0L;
|
||||
|
||||
@VisibleForTesting
|
||||
public RMState getState() {
|
||||
@ -56,8 +56,8 @@ public void checkVersion() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getAndIncrementEpoch() throws Exception {
|
||||
int currentEpoch = epoch;
|
||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||
long currentEpoch = epoch;
|
||||
epoch = epoch + 1;
|
||||
return currentEpoch;
|
||||
}
|
||||
|
@ -49,8 +49,8 @@ protected void closeInternal() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getAndIncrementEpoch() throws Exception {
|
||||
return 0;
|
||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -545,7 +545,7 @@ public void checkVersion() throws Exception {
|
||||
/**
|
||||
* Get the current epoch of RM and increment the value.
|
||||
*/
|
||||
public abstract int getAndIncrementEpoch() throws Exception;
|
||||
public abstract long getAndIncrementEpoch() throws Exception;
|
||||
|
||||
/**
|
||||
* Blocking API
|
||||
|
@ -412,9 +412,9 @@ protected synchronized Version loadVersion() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getAndIncrementEpoch() throws Exception {
|
||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
|
||||
int currentEpoch = 0;
|
||||
long currentEpoch = 0;
|
||||
if (existsWithRetries(epochNodePath, true) != null) {
|
||||
// load current epoch
|
||||
byte[] data = getDataWithRetries(epochNodePath, true);
|
||||
|
@ -32,15 +32,15 @@
|
||||
@Unstable
|
||||
public abstract class Epoch {
|
||||
|
||||
public static Epoch newInstance(int sequenceNumber) {
|
||||
public static Epoch newInstance(long sequenceNumber) {
|
||||
Epoch epoch = Records.newRecord(Epoch.class);
|
||||
epoch.setEpoch(sequenceNumber);
|
||||
return epoch;
|
||||
}
|
||||
|
||||
public abstract int getEpoch();
|
||||
public abstract long getEpoch();
|
||||
|
||||
public abstract void setEpoch(int sequenceNumber);
|
||||
public abstract void setEpoch(long sequenceNumber);
|
||||
|
||||
public abstract EpochProto getProto();
|
||||
|
||||
@ -50,10 +50,7 @@ public String toString() {
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + getEpoch();
|
||||
return result;
|
||||
return (int) (getEpoch() ^ (getEpoch() >>> 32));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -53,13 +53,13 @@ private void maybeInitBuilder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEpoch() {
|
||||
public long getEpoch() {
|
||||
EpochProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (int) (p.getEpoch() & 0xffffffff);
|
||||
return p.getEpoch();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEpoch(int sequentialNumber) {
|
||||
public void setEpoch(long sequentialNumber) {
|
||||
maybeInitBuilder();
|
||||
builder.setEpoch(sequentialNumber);
|
||||
}
|
||||
|
@ -26,7 +26,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -58,9 +58,8 @@ public class AppSchedulingInfo {
|
||||
Queue queue;
|
||||
final String user;
|
||||
// TODO making containerIdCounter long
|
||||
private final AtomicInteger containerIdCounter;
|
||||
private final int EPOCH_BIT_MASK = 0x3ff;
|
||||
private final int EPOCH_BIT_SHIFT = 22;
|
||||
private final AtomicLong containerIdCounter;
|
||||
private final int EPOCH_BIT_SHIFT = 40;
|
||||
|
||||
final Set<Priority> priorities = new TreeSet<Priority>(
|
||||
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
|
||||
@ -77,15 +76,14 @@ public class AppSchedulingInfo {
|
||||
|
||||
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
|
||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||
int epoch) {
|
||||
long epoch) {
|
||||
this.applicationAttemptId = appAttemptId;
|
||||
this.applicationId = appAttemptId.getApplicationId();
|
||||
this.queue = queue;
|
||||
this.queueName = queue.getQueueName();
|
||||
this.user = user;
|
||||
this.activeUsersManager = activeUsersManager;
|
||||
this.containerIdCounter = new AtomicInteger(
|
||||
(epoch & EPOCH_BIT_MASK) << EPOCH_BIT_SHIFT);
|
||||
this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
|
||||
}
|
||||
|
||||
public ApplicationId getApplicationId() {
|
||||
@ -117,7 +115,7 @@ private synchronized void clearRequests() {
|
||||
LOG.info("Application " + applicationId + " requests cleared");
|
||||
}
|
||||
|
||||
public int getNewContainerId() {
|
||||
public long getNewContainerId() {
|
||||
return this.containerIdCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
|
@ -182,7 +182,7 @@ public Set<ContainerId> getPendingRelease() {
|
||||
return this.pendingRelease;
|
||||
}
|
||||
|
||||
public int getNewContainerId() {
|
||||
public long getNewContainerId() {
|
||||
return appSchedulingInfo.getNewContainerId();
|
||||
}
|
||||
|
||||
|
@ -238,7 +238,7 @@ public void testSchedulerRecovery() throws Exception {
|
||||
}
|
||||
|
||||
// *********** check appSchedulingInfo state ***********
|
||||
assertEquals((1 << 22) + 1, schedulerAttempt.getNewContainerId());
|
||||
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
|
||||
}
|
||||
|
||||
private void checkCSQueue(MockRM rm,
|
||||
|
@ -508,13 +508,13 @@ public void testEpoch(RMStateStoreHelper stateStoreHelper)
|
||||
RMStateStore store = stateStoreHelper.getRMStateStore();
|
||||
store.setRMDispatcher(new TestDispatcher());
|
||||
|
||||
int firstTimeEpoch = store.getAndIncrementEpoch();
|
||||
long firstTimeEpoch = store.getAndIncrementEpoch();
|
||||
Assert.assertEquals(0, firstTimeEpoch);
|
||||
|
||||
int secondTimeEpoch = store.getAndIncrementEpoch();
|
||||
long secondTimeEpoch = store.getAndIncrementEpoch();
|
||||
Assert.assertEquals(1, secondTimeEpoch);
|
||||
|
||||
int thirdTimeEpoch = store.getAndIncrementEpoch();
|
||||
long thirdTimeEpoch = store.getAndIncrementEpoch();
|
||||
Assert.assertEquals(2, thirdTimeEpoch);
|
||||
}
|
||||
|
||||
|
@ -63,13 +63,13 @@ public void testMove() {
|
||||
|
||||
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
when(rmContext.getEpoch()).thenReturn(3);
|
||||
when(rmContext.getEpoch()).thenReturn(3L);
|
||||
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
|
||||
user, oldQueue, oldQueue.getActiveUsersManager(), rmContext);
|
||||
oldMetrics.submitApp(user);
|
||||
|
||||
// confirm that containerId is calculated based on epoch.
|
||||
assertEquals(app.getNewContainerId(), 0x00c00001);
|
||||
assertEquals(0x30000000001L, app.getNewContainerId());
|
||||
|
||||
// Resource request
|
||||
Resource requestedResource = Resource.newInstance(1536, 2);
|
||||
|
@ -172,7 +172,9 @@ public static ContainerId getMockContainerId(FiCaSchedulerApp application) {
|
||||
ContainerId containerId = mock(ContainerId.class);
|
||||
doReturn(application.getApplicationAttemptId()).
|
||||
when(containerId).getApplicationAttemptId();
|
||||
doReturn(application.getNewContainerId()).when(containerId).getId();
|
||||
long id = application.getNewContainerId();
|
||||
doReturn((int)id).when(containerId).getId();
|
||||
doReturn(id).when(containerId).getContainerId();
|
||||
return containerId;
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,7 @@ public void setup() throws Exception {
|
||||
maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler);
|
||||
appNum = 0;
|
||||
rmContext = mock(RMContext.class);
|
||||
when(rmContext.getEpoch()).thenReturn(0);
|
||||
when(rmContext.getEpoch()).thenReturn(0L);
|
||||
}
|
||||
|
||||
private FSAppAttempt addApp(FSLeafQueue queue, String user) {
|
||||
|
Loading…
Reference in New Issue
Block a user