YARN-1984. LeveldbTimelineStore does not handle db exceptions properly. Contributed by Varun Saxena
This commit is contained in:
parent
2967c17fef
commit
1ce4d33c2d
@ -135,6 +135,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
YARN-2315. FairScheduler: Set current capacity in addition to capacity.
|
YARN-2315. FairScheduler: Set current capacity in addition to capacity.
|
||||||
(Zhihai Xu via kasha)
|
(Zhihai Xu via kasha)
|
||||||
|
|
||||||
|
YARN-1984. LeveldbTimelineStore does not handle db exceptions properly
|
||||||
|
(Varun Saxena via jlowe)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -66,10 +66,10 @@
|
|||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||||
import org.fusesource.leveldbjni.JniDBFactory;
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
import org.iq80.leveldb.DB;
|
import org.iq80.leveldb.DB;
|
||||||
import org.iq80.leveldb.DBException;
|
import org.iq80.leveldb.DBException;
|
||||||
import org.iq80.leveldb.DBIterator;
|
|
||||||
import org.iq80.leveldb.Options;
|
import org.iq80.leveldb.Options;
|
||||||
import org.iq80.leveldb.ReadOptions;
|
import org.iq80.leveldb.ReadOptions;
|
||||||
import org.iq80.leveldb.WriteBatch;
|
import org.iq80.leveldb.WriteBatch;
|
||||||
@ -438,13 +438,15 @@ public TimelineEntity getEntity(String entityId, String entityType,
|
|||||||
.add(entityType).add(writeReverseOrderedLong(revStartTime))
|
.add(entityType).add(writeReverseOrderedLong(revStartTime))
|
||||||
.add(entityId).getBytesForLookup();
|
.add(entityId).getBytesForLookup();
|
||||||
|
|
||||||
DBIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
try {
|
try {
|
||||||
iterator = db.iterator();
|
iterator = new LeveldbIterator(db);
|
||||||
iterator.seek(prefix);
|
iterator.seek(prefix);
|
||||||
|
|
||||||
return getEntity(entityId, entityType, revStartTime, fields, iterator,
|
return getEntity(entityId, entityType, revStartTime, fields, iterator,
|
||||||
prefix, prefix.length);
|
prefix, prefix.length);
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, iterator);
|
IOUtils.cleanup(LOG, iterator);
|
||||||
}
|
}
|
||||||
@ -455,7 +457,7 @@ public TimelineEntity getEntity(String entityId, String entityType,
|
|||||||
* specified fields for this entity, return null.
|
* specified fields for this entity, return null.
|
||||||
*/
|
*/
|
||||||
private static TimelineEntity getEntity(String entityId, String entityType,
|
private static TimelineEntity getEntity(String entityId, String entityType,
|
||||||
Long startTime, EnumSet<Field> fields, DBIterator iterator,
|
Long startTime, EnumSet<Field> fields, LeveldbIterator iterator,
|
||||||
byte[] prefix, int prefixlen) throws IOException {
|
byte[] prefix, int prefixlen) throws IOException {
|
||||||
if (fields == null) {
|
if (fields == null) {
|
||||||
fields = EnumSet.allOf(Field.class);
|
fields = EnumSet.allOf(Field.class);
|
||||||
@ -562,7 +564,7 @@ public int compare(byte[] o1, byte[] o2) {
|
|||||||
o2.length);
|
o2.length);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
DBIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
try {
|
try {
|
||||||
// look up start times for the specified entities
|
// look up start times for the specified entities
|
||||||
// skip entities with no start time
|
// skip entities with no start time
|
||||||
@ -606,7 +608,7 @@ public int compare(byte[] o1, byte[] o2) {
|
|||||||
if (limit == null) {
|
if (limit == null) {
|
||||||
limit = DEFAULT_LIMIT;
|
limit = DEFAULT_LIMIT;
|
||||||
}
|
}
|
||||||
iterator = db.iterator();
|
iterator = new LeveldbIterator(db);
|
||||||
for (iterator.seek(first); entity.getEvents().size() < limit &&
|
for (iterator.seek(first); entity.getEvents().size() < limit &&
|
||||||
iterator.hasNext(); iterator.next()) {
|
iterator.hasNext(); iterator.next()) {
|
||||||
byte[] key = iterator.peekNext().getKey();
|
byte[] key = iterator.peekNext().getKey();
|
||||||
@ -623,6 +625,8 @@ public int compare(byte[] o1, byte[] o2) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, iterator);
|
IOUtils.cleanup(LOG, iterator);
|
||||||
}
|
}
|
||||||
@ -683,7 +687,7 @@ private TimelineEntities getEntityByTime(byte[] base,
|
|||||||
String entityType, Long limit, Long starttime, Long endtime,
|
String entityType, Long limit, Long starttime, Long endtime,
|
||||||
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
|
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
|
||||||
EnumSet<Field> fields) throws IOException {
|
EnumSet<Field> fields) throws IOException {
|
||||||
DBIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
try {
|
try {
|
||||||
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
|
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
|
||||||
// only db keys matching the prefix (base + entity type) will be parsed
|
// only db keys matching the prefix (base + entity type) will be parsed
|
||||||
@ -724,7 +728,7 @@ private TimelineEntities getEntityByTime(byte[] base,
|
|||||||
}
|
}
|
||||||
|
|
||||||
TimelineEntities entities = new TimelineEntities();
|
TimelineEntities entities = new TimelineEntities();
|
||||||
iterator = db.iterator();
|
iterator = new LeveldbIterator(db);
|
||||||
iterator.seek(first);
|
iterator.seek(first);
|
||||||
// iterate until one of the following conditions is met: limit is
|
// iterate until one of the following conditions is met: limit is
|
||||||
// reached, there are no more keys, the key prefix no longer matches,
|
// reached, there are no more keys, the key prefix no longer matches,
|
||||||
@ -783,11 +787,24 @@ private TimelineEntities getEntityByTime(byte[] base,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return entities;
|
return entities;
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, iterator);
|
IOUtils.cleanup(LOG, iterator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle error and set it in response.
|
||||||
|
*/
|
||||||
|
private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) {
|
||||||
|
TimelinePutError error = new TimelinePutError();
|
||||||
|
error.setEntityId(entity.getEntityId());
|
||||||
|
error.setEntityType(entity.getEntityType());
|
||||||
|
error.setErrorCode(errorCode);
|
||||||
|
response.addError(error);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Put a single entity. If there is an error, add a TimelinePutError to the
|
* Put a single entity. If there is an error, add a TimelinePutError to the
|
||||||
* given response.
|
* given response.
|
||||||
@ -812,11 +829,7 @@ private void put(TimelineEntity entity, TimelinePutResponse response,
|
|||||||
entity.getStartTime(), events);
|
entity.getStartTime(), events);
|
||||||
if (startAndInsertTime == null) {
|
if (startAndInsertTime == null) {
|
||||||
// if no start time is found, add an error and return
|
// if no start time is found, add an error and return
|
||||||
TimelinePutError error = new TimelinePutError();
|
handleError(entity, response, TimelinePutError.NO_START_TIME);
|
||||||
error.setEntityId(entity.getEntityId());
|
|
||||||
error.setEntityType(entity.getEntityType());
|
|
||||||
error.setErrorCode(TimelinePutError.NO_START_TIME);
|
|
||||||
response.addError(error);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
revStartTime = writeReverseOrderedLong(startAndInsertTime
|
revStartTime = writeReverseOrderedLong(startAndInsertTime
|
||||||
@ -883,11 +896,7 @@ private void put(TimelineEntity entity, TimelinePutResponse response,
|
|||||||
if (!domainId.equals(entity.getDomainId())) {
|
if (!domainId.equals(entity.getDomainId())) {
|
||||||
// in this case the entity will be put, but the relation will be
|
// in this case the entity will be put, but the relation will be
|
||||||
// ignored
|
// ignored
|
||||||
TimelinePutError error = new TimelinePutError();
|
handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION);
|
||||||
error.setEntityId(entity.getEntityId());
|
|
||||||
error.setEntityType(entity.getEntityType());
|
|
||||||
error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
|
|
||||||
response.addError(error);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -933,11 +942,7 @@ private void put(TimelineEntity entity, TimelinePutResponse response,
|
|||||||
if (entity.getDomainId() == null ||
|
if (entity.getDomainId() == null ||
|
||||||
entity.getDomainId().length() == 0) {
|
entity.getDomainId().length() == 0) {
|
||||||
if (!allowEmptyDomainId) {
|
if (!allowEmptyDomainId) {
|
||||||
TimelinePutError error = new TimelinePutError();
|
handleError(entity, response, TimelinePutError.NO_DOMAIN);
|
||||||
error.setEntityId(entity.getEntityId());
|
|
||||||
error.setEntityType(entity.getEntityType());
|
|
||||||
error.setErrorCode(TimelinePutError.NO_DOMAIN);
|
|
||||||
response.addError(error);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -946,14 +951,14 @@ private void put(TimelineEntity entity, TimelinePutResponse response,
|
|||||||
entity.getDomainId().getBytes());
|
entity.getDomainId().getBytes());
|
||||||
}
|
}
|
||||||
db.write(writeBatch);
|
db.write(writeBatch);
|
||||||
|
} catch (DBException de) {
|
||||||
|
LOG.error("Error putting entity " + entity.getEntityId() +
|
||||||
|
" of type " + entity.getEntityType(), de);
|
||||||
|
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Error putting entity " + entity.getEntityId() +
|
LOG.error("Error putting entity " + entity.getEntityId() +
|
||||||
" of type " + entity.getEntityType(), e);
|
" of type " + entity.getEntityType(), e);
|
||||||
TimelinePutError error = new TimelinePutError();
|
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
|
||||||
error.setEntityId(entity.getEntityId());
|
|
||||||
error.setEntityType(entity.getEntityType());
|
|
||||||
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
|
|
||||||
response.addError(error);
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
writeLocks.returnLock(lock);
|
writeLocks.returnLock(lock);
|
||||||
@ -983,15 +988,16 @@ private void put(TimelineEntity entity, TimelinePutResponse response,
|
|||||||
relatedEntity.getType(), relatedEntityStartTime),
|
relatedEntity.getType(), relatedEntityStartTime),
|
||||||
writeReverseOrderedLong(relatedEntityStartAndInsertTime
|
writeReverseOrderedLong(relatedEntityStartAndInsertTime
|
||||||
.insertTime));
|
.insertTime));
|
||||||
|
} catch (DBException de) {
|
||||||
|
LOG.error("Error putting related entity " + relatedEntity.getId() +
|
||||||
|
" of type " + relatedEntity.getType() + " for entity " +
|
||||||
|
entity.getEntityId() + " of type " + entity.getEntityType(), de);
|
||||||
|
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Error putting related entity " + relatedEntity.getId() +
|
LOG.error("Error putting related entity " + relatedEntity.getId() +
|
||||||
" of type " + relatedEntity.getType() + " for entity " +
|
" of type " + relatedEntity.getType() + " for entity " +
|
||||||
entity.getEntityId() + " of type " + entity.getEntityType(), e);
|
entity.getEntityId() + " of type " + entity.getEntityType(), e);
|
||||||
TimelinePutError error = new TimelinePutError();
|
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
|
||||||
error.setEntityId(entity.getEntityId());
|
|
||||||
error.setEntityType(entity.getEntityType());
|
|
||||||
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
|
|
||||||
response.addError(error);
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
writeLocks.returnLock(lock);
|
writeLocks.returnLock(lock);
|
||||||
@ -1072,23 +1078,27 @@ private byte[] getStartTime(String entityId, String entityType)
|
|||||||
private Long getStartTimeLong(String entityId, String entityType)
|
private Long getStartTimeLong(String entityId, String entityType)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
|
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
|
||||||
// start time is not provided, so try to look it up
|
try {
|
||||||
if (startTimeReadCache.containsKey(entity)) {
|
// start time is not provided, so try to look it up
|
||||||
// found the start time in the cache
|
if (startTimeReadCache.containsKey(entity)) {
|
||||||
return startTimeReadCache.get(entity);
|
// found the start time in the cache
|
||||||
} else {
|
return startTimeReadCache.get(entity);
|
||||||
// try to look up the start time in the db
|
|
||||||
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
|
|
||||||
byte[] v = db.get(b);
|
|
||||||
if (v == null) {
|
|
||||||
// did not find the start time in the db
|
|
||||||
return null;
|
|
||||||
} else {
|
} else {
|
||||||
// found the start time in the db
|
// try to look up the start time in the db
|
||||||
Long l = readReverseOrderedLong(v, 0);
|
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
|
||||||
startTimeReadCache.put(entity, l);
|
byte[] v = db.get(b);
|
||||||
return l;
|
if (v == null) {
|
||||||
|
// did not find the start time in the db
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
// found the start time in the db
|
||||||
|
Long l = readReverseOrderedLong(v, 0);
|
||||||
|
startTimeReadCache.put(entity, l);
|
||||||
|
return l;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1152,27 +1162,31 @@ private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity,
|
|||||||
StartAndInsertTime startAndInsertTime = null;
|
StartAndInsertTime startAndInsertTime = null;
|
||||||
// create lookup key for start time
|
// create lookup key for start time
|
||||||
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
|
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
|
||||||
// retrieve value for key
|
try {
|
||||||
byte[] v = db.get(b);
|
// retrieve value for key
|
||||||
if (v == null) {
|
byte[] v = db.get(b);
|
||||||
// start time doesn't exist in db
|
if (v == null) {
|
||||||
if (suggestedStartTime == null) {
|
// start time doesn't exist in db
|
||||||
return null;
|
if (suggestedStartTime == null) {
|
||||||
}
|
return null;
|
||||||
startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
|
}
|
||||||
System.currentTimeMillis());
|
startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
|
||||||
|
System.currentTimeMillis());
|
||||||
|
|
||||||
// write suggested start time
|
// write suggested start time
|
||||||
v = new byte[16];
|
v = new byte[16];
|
||||||
writeReverseOrderedLong(suggestedStartTime, v, 0);
|
writeReverseOrderedLong(suggestedStartTime, v, 0);
|
||||||
writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
|
writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
|
||||||
WriteOptions writeOptions = new WriteOptions();
|
WriteOptions writeOptions = new WriteOptions();
|
||||||
writeOptions.sync(true);
|
writeOptions.sync(true);
|
||||||
db.put(b, v, writeOptions);
|
db.put(b, v, writeOptions);
|
||||||
} else {
|
} else {
|
||||||
// found start time in db, so ignore suggested start time
|
// found start time in db, so ignore suggested start time
|
||||||
startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
|
startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
|
||||||
readReverseOrderedLong(v, 8));
|
readReverseOrderedLong(v, 8));
|
||||||
|
}
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
startTimeWriteCache.put(entity, startAndInsertTime);
|
startTimeWriteCache.put(entity, startAndInsertTime);
|
||||||
startTimeReadCache.put(entity, startAndInsertTime.startTime);
|
startTimeReadCache.put(entity, startAndInsertTime.startTime);
|
||||||
@ -1373,7 +1387,7 @@ static int getStartTimeWriteCacheSize(Configuration conf) {
|
|||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
List<String> getEntityTypes() throws IOException {
|
List<String> getEntityTypes() throws IOException {
|
||||||
DBIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
try {
|
try {
|
||||||
iterator = getDbIterator(false);
|
iterator = getDbIterator(false);
|
||||||
List<String> entityTypes = new ArrayList<String>();
|
List<String> entityTypes = new ArrayList<String>();
|
||||||
@ -1396,6 +1410,8 @@ List<String> getEntityTypes() throws IOException {
|
|||||||
iterator.seek(lookupKey);
|
iterator.seek(lookupKey);
|
||||||
}
|
}
|
||||||
return entityTypes;
|
return entityTypes;
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, iterator);
|
IOUtils.cleanup(LOG, iterator);
|
||||||
}
|
}
|
||||||
@ -1406,7 +1422,7 @@ List<String> getEntityTypes() throws IOException {
|
|||||||
* the given write batch.
|
* the given write batch.
|
||||||
*/
|
*/
|
||||||
private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
|
private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
|
||||||
DBIterator iterator) {
|
LeveldbIterator iterator) {
|
||||||
for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
|
for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
|
||||||
byte[] key = iterator.peekNext().getKey();
|
byte[] key = iterator.peekNext().getKey();
|
||||||
if (!prefixMatches(prefix, prefix.length, key)) {
|
if (!prefixMatches(prefix, prefix.length, key)) {
|
||||||
@ -1418,7 +1434,7 @@ private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
|
|||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
|
boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
|
||||||
DBIterator iterator, DBIterator pfIterator, boolean seeked)
|
LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
WriteBatch writeBatch = null;
|
WriteBatch writeBatch = null;
|
||||||
try {
|
try {
|
||||||
@ -1524,6 +1540,8 @@ boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
|
|||||||
writeOptions.sync(true);
|
writeOptions.sync(true);
|
||||||
db.write(writeBatch, writeOptions);
|
db.write(writeBatch, writeOptions);
|
||||||
return true;
|
return true;
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, writeBatch);
|
IOUtils.cleanup(LOG, writeBatch);
|
||||||
}
|
}
|
||||||
@ -1542,8 +1560,8 @@ void discardOldEntities(long timestamp)
|
|||||||
try {
|
try {
|
||||||
List<String> entityTypes = getEntityTypes();
|
List<String> entityTypes = getEntityTypes();
|
||||||
for (String entityType : entityTypes) {
|
for (String entityType : entityTypes) {
|
||||||
DBIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
DBIterator pfIterator = null;
|
LeveldbIterator pfIterator = null;
|
||||||
long typeCount = 0;
|
long typeCount = 0;
|
||||||
try {
|
try {
|
||||||
deleteLock.writeLock().lock();
|
deleteLock.writeLock().lock();
|
||||||
@ -1583,21 +1601,25 @@ void discardOldEntities(long timestamp)
|
|||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
DBIterator getDbIterator(boolean fillCache) {
|
LeveldbIterator getDbIterator(boolean fillCache) {
|
||||||
ReadOptions readOptions = new ReadOptions();
|
ReadOptions readOptions = new ReadOptions();
|
||||||
readOptions.fillCache(fillCache);
|
readOptions.fillCache(fillCache);
|
||||||
return db.iterator(readOptions);
|
return new LeveldbIterator(db, readOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
Version loadVersion() throws IOException {
|
Version loadVersion() throws IOException {
|
||||||
byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
|
try {
|
||||||
// if version is not stored previously, treat it as 1.0.
|
byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
|
||||||
if (data == null || data.length == 0) {
|
// if version is not stored previously, treat it as 1.0.
|
||||||
return Version.newInstance(1, 0);
|
if (data == null || data.length == 0) {
|
||||||
|
return Version.newInstance(1, 0);
|
||||||
|
}
|
||||||
|
Version version =
|
||||||
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
||||||
|
return version;
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
Version version =
|
|
||||||
new VersionPBImpl(VersionProto.parseFrom(data));
|
|
||||||
return version;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only used for test
|
// Only used for test
|
||||||
@ -1726,6 +1748,8 @@ public void put(TimelineDomain domain) throws IOException {
|
|||||||
writeBatch.put(domainEntryKey, timestamps);
|
writeBatch.put(domainEntryKey, timestamps);
|
||||||
writeBatch.put(ownerLookupEntryKey, timestamps);
|
writeBatch.put(ownerLookupEntryKey, timestamps);
|
||||||
db.write(writeBatch);
|
db.write(writeBatch);
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, writeBatch);
|
IOUtils.cleanup(LOG, writeBatch);
|
||||||
}
|
}
|
||||||
@ -1754,13 +1778,15 @@ private static byte[] createOwnerLookupKey(
|
|||||||
@Override
|
@Override
|
||||||
public TimelineDomain getDomain(String domainId)
|
public TimelineDomain getDomain(String domainId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
DBIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
try {
|
try {
|
||||||
byte[] prefix = KeyBuilder.newInstance()
|
byte[] prefix = KeyBuilder.newInstance()
|
||||||
.add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup();
|
.add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup();
|
||||||
iterator = db.iterator();
|
iterator = new LeveldbIterator(db);
|
||||||
iterator.seek(prefix);
|
iterator.seek(prefix);
|
||||||
return getTimelineDomain(iterator, domainId, prefix);
|
return getTimelineDomain(iterator, domainId, prefix);
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, iterator);
|
IOUtils.cleanup(LOG, iterator);
|
||||||
}
|
}
|
||||||
@ -1769,12 +1795,12 @@ public TimelineDomain getDomain(String domainId)
|
|||||||
@Override
|
@Override
|
||||||
public TimelineDomains getDomains(String owner)
|
public TimelineDomains getDomains(String owner)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
DBIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
try {
|
try {
|
||||||
byte[] prefix = KeyBuilder.newInstance()
|
byte[] prefix = KeyBuilder.newInstance()
|
||||||
.add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup();
|
.add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup();
|
||||||
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
|
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
|
||||||
for (iterator = db.iterator(), iterator.seek(prefix);
|
for (iterator = new LeveldbIterator(db), iterator.seek(prefix);
|
||||||
iterator.hasNext();) {
|
iterator.hasNext();) {
|
||||||
byte[] key = iterator.peekNext().getKey();
|
byte[] key = iterator.peekNext().getKey();
|
||||||
if (!prefixMatches(prefix, prefix.length, key)) {
|
if (!prefixMatches(prefix, prefix.length, key)) {
|
||||||
@ -1809,13 +1835,15 @@ public int compare(
|
|||||||
TimelineDomains domainsToReturn = new TimelineDomains();
|
TimelineDomains domainsToReturn = new TimelineDomains();
|
||||||
domainsToReturn.addDomains(domains);
|
domainsToReturn.addDomains(domains);
|
||||||
return domainsToReturn;
|
return domainsToReturn;
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, iterator);
|
IOUtils.cleanup(LOG, iterator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TimelineDomain getTimelineDomain(
|
private static TimelineDomain getTimelineDomain(
|
||||||
DBIterator iterator, String domainId, byte[] prefix) throws IOException {
|
LeveldbIterator iterator, String domainId, byte[] prefix) throws IOException {
|
||||||
// Iterate over all the rows whose key starts with prefix to retrieve the
|
// Iterate over all the rows whose key starts with prefix to retrieve the
|
||||||
// domain information.
|
// domain information.
|
||||||
TimelineDomain domain = new TimelineDomain();
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
@ -45,7 +45,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
||||||
import org.iq80.leveldb.DBIterator;
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||||
|
import org.iq80.leveldb.DBException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -146,13 +147,15 @@ public void testCacheSizes() {
|
|||||||
|
|
||||||
private boolean deleteNextEntity(String entityType, byte[] ts)
|
private boolean deleteNextEntity(String entityType, byte[] ts)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
DBIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
DBIterator pfIterator = null;
|
LeveldbIterator pfIterator = null;
|
||||||
try {
|
try {
|
||||||
iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
|
iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
|
||||||
pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
|
pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
|
||||||
return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
|
return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
|
||||||
iterator, pfIterator, false);
|
iterator, pfIterator, false);
|
||||||
|
} catch(DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(null, iterator, pfIterator);
|
IOUtils.cleanup(null, iterator, pfIterator);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user