diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6a29d19e6b..5d1e10779b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -135,6 +135,9 @@ Release 2.7.0 - UNRELEASED YARN-2315. FairScheduler: Set current capacity in addition to capacity. (Zhihai Xu via kasha) + YARN-1984. LeveldbTimelineStore does not handle db exceptions properly + (Varun Saxena via jlowe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java index c4ea9960ad..33deb80453 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java @@ -66,10 +66,10 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBException; -import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Options; import org.iq80.leveldb.ReadOptions; import org.iq80.leveldb.WriteBatch; @@ -438,13 +438,15 @@ public TimelineEntity getEntity(String entityId, String entityType, .add(entityType).add(writeReverseOrderedLong(revStartTime)) .add(entityId).getBytesForLookup(); - DBIterator iterator = null; + LeveldbIterator iterator = null; try { - iterator = db.iterator(); + iterator = new LeveldbIterator(db); iterator.seek(prefix); return getEntity(entityId, entityType, revStartTime, fields, iterator, prefix, prefix.length); + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(LOG, iterator); } @@ -455,7 +457,7 @@ public TimelineEntity getEntity(String entityId, String entityType, * specified fields for this entity, return null. */ private static TimelineEntity getEntity(String entityId, String entityType, - Long startTime, EnumSet fields, DBIterator iterator, + Long startTime, EnumSet fields, LeveldbIterator iterator, byte[] prefix, int prefixlen) throws IOException { if (fields == null) { fields = EnumSet.allOf(Field.class); @@ -562,7 +564,7 @@ public int compare(byte[] o1, byte[] o2) { o2.length); } }); - DBIterator iterator = null; + LeveldbIterator iterator = null; try { // look up start times for the specified entities // skip entities with no start time @@ -606,7 +608,7 @@ public int compare(byte[] o1, byte[] o2) { if (limit == null) { limit = DEFAULT_LIMIT; } - iterator = db.iterator(); + iterator = new LeveldbIterator(db); for (iterator.seek(first); entity.getEvents().size() < limit && iterator.hasNext(); iterator.next()) { byte[] key = iterator.peekNext().getKey(); @@ -623,6 +625,8 @@ public int compare(byte[] o1, byte[] o2) { } } } + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(LOG, iterator); } @@ -683,7 +687,7 @@ private TimelineEntities getEntityByTime(byte[] base, String entityType, Long limit, Long starttime, Long endtime, String fromId, Long fromTs, Collection secondaryFilters, EnumSet fields) throws IOException { - DBIterator iterator = null; + LeveldbIterator iterator = null; try { KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); // only db keys matching the prefix (base + entity type) will be parsed @@ -724,7 +728,7 @@ private TimelineEntities getEntityByTime(byte[] base, } TimelineEntities entities = new TimelineEntities(); - iterator = db.iterator(); + iterator = new LeveldbIterator(db); iterator.seek(first); // iterate until one of the following conditions is met: limit is // reached, there are no more keys, the key prefix no longer matches, @@ -783,10 +787,23 @@ private TimelineEntities getEntityByTime(byte[] base, } } return entities; + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(LOG, iterator); } } + + /** + * Handle error and set it in response. + */ + private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) { + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(errorCode); + response.addError(error); + } /** * Put a single entity. If there is an error, add a TimelinePutError to the @@ -812,11 +829,7 @@ private void put(TimelineEntity entity, TimelinePutResponse response, entity.getStartTime(), events); if (startAndInsertTime == null) { // if no start time is found, add an error and return - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entity.getEntityId()); - error.setEntityType(entity.getEntityType()); - error.setErrorCode(TimelinePutError.NO_START_TIME); - response.addError(error); + handleError(entity, response, TimelinePutError.NO_START_TIME); return; } revStartTime = writeReverseOrderedLong(startAndInsertTime @@ -883,11 +896,7 @@ private void put(TimelineEntity entity, TimelinePutResponse response, if (!domainId.equals(entity.getDomainId())) { // in this case the entity will be put, but the relation will be // ignored - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entity.getEntityId()); - error.setEntityType(entity.getEntityType()); - error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION); - response.addError(error); + handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION); continue; } } @@ -933,11 +942,7 @@ private void put(TimelineEntity entity, TimelinePutResponse response, if (entity.getDomainId() == null || entity.getDomainId().length() == 0) { if (!allowEmptyDomainId) { - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entity.getEntityId()); - error.setEntityType(entity.getEntityType()); - error.setErrorCode(TimelinePutError.NO_DOMAIN); - response.addError(error); + handleError(entity, response, TimelinePutError.NO_DOMAIN); return; } } else { @@ -946,14 +951,14 @@ private void put(TimelineEntity entity, TimelinePutResponse response, entity.getDomainId().getBytes()); } db.write(writeBatch); + } catch (DBException de) { + LOG.error("Error putting entity " + entity.getEntityId() + + " of type " + entity.getEntityType(), de); + handleError(entity, response, TimelinePutError.IO_EXCEPTION); } catch (IOException e) { LOG.error("Error putting entity " + entity.getEntityId() + " of type " + entity.getEntityType(), e); - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entity.getEntityId()); - error.setEntityType(entity.getEntityType()); - error.setErrorCode(TimelinePutError.IO_EXCEPTION); - response.addError(error); + handleError(entity, response, TimelinePutError.IO_EXCEPTION); } finally { lock.unlock(); writeLocks.returnLock(lock); @@ -983,15 +988,16 @@ private void put(TimelineEntity entity, TimelinePutResponse response, relatedEntity.getType(), relatedEntityStartTime), writeReverseOrderedLong(relatedEntityStartAndInsertTime .insertTime)); + } catch (DBException de) { + LOG.error("Error putting related entity " + relatedEntity.getId() + + " of type " + relatedEntity.getType() + " for entity " + + entity.getEntityId() + " of type " + entity.getEntityType(), de); + handleError(entity, response, TimelinePutError.IO_EXCEPTION); } catch (IOException e) { LOG.error("Error putting related entity " + relatedEntity.getId() + " of type " + relatedEntity.getType() + " for entity " + entity.getEntityId() + " of type " + entity.getEntityType(), e); - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entity.getEntityId()); - error.setEntityType(entity.getEntityType()); - error.setErrorCode(TimelinePutError.IO_EXCEPTION); - response.addError(error); + handleError(entity, response, TimelinePutError.IO_EXCEPTION); } finally { lock.unlock(); writeLocks.returnLock(lock); @@ -1072,23 +1078,27 @@ private byte[] getStartTime(String entityId, String entityType) private Long getStartTimeLong(String entityId, String entityType) throws IOException { EntityIdentifier entity = new EntityIdentifier(entityId, entityType); - // start time is not provided, so try to look it up - if (startTimeReadCache.containsKey(entity)) { - // found the start time in the cache - return startTimeReadCache.get(entity); - } else { - // try to look up the start time in the db - byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); - byte[] v = db.get(b); - if (v == null) { - // did not find the start time in the db - return null; + try { + // start time is not provided, so try to look it up + if (startTimeReadCache.containsKey(entity)) { + // found the start time in the cache + return startTimeReadCache.get(entity); } else { - // found the start time in the db - Long l = readReverseOrderedLong(v, 0); - startTimeReadCache.put(entity, l); - return l; + // try to look up the start time in the db + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + byte[] v = db.get(b); + if (v == null) { + // did not find the start time in the db + return null; + } else { + // found the start time in the db + Long l = readReverseOrderedLong(v, 0); + startTimeReadCache.put(entity, l); + return l; + } } + } catch(DBException e) { + throw new IOException(e); } } @@ -1152,28 +1162,32 @@ private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity, StartAndInsertTime startAndInsertTime = null; // create lookup key for start time byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); - // retrieve value for key - byte[] v = db.get(b); - if (v == null) { - // start time doesn't exist in db - if (suggestedStartTime == null) { - return null; + try { + // retrieve value for key + byte[] v = db.get(b); + if (v == null) { + // start time doesn't exist in db + if (suggestedStartTime == null) { + return null; + } + startAndInsertTime = new StartAndInsertTime(suggestedStartTime, + System.currentTimeMillis()); + + // write suggested start time + v = new byte[16]; + writeReverseOrderedLong(suggestedStartTime, v, 0); + writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8); + WriteOptions writeOptions = new WriteOptions(); + writeOptions.sync(true); + db.put(b, v, writeOptions); + } else { + // found start time in db, so ignore suggested start time + startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0), + readReverseOrderedLong(v, 8)); } - startAndInsertTime = new StartAndInsertTime(suggestedStartTime, - System.currentTimeMillis()); - - // write suggested start time - v = new byte[16]; - writeReverseOrderedLong(suggestedStartTime, v, 0); - writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8); - WriteOptions writeOptions = new WriteOptions(); - writeOptions.sync(true); - db.put(b, v, writeOptions); - } else { - // found start time in db, so ignore suggested start time - startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0), - readReverseOrderedLong(v, 8)); - } + } catch(DBException e) { + throw new IOException(e); + } startTimeWriteCache.put(entity, startAndInsertTime); startTimeReadCache.put(entity, startAndInsertTime.startTime); return startAndInsertTime; @@ -1373,7 +1387,7 @@ static int getStartTimeWriteCacheSize(Configuration conf) { @VisibleForTesting List getEntityTypes() throws IOException { - DBIterator iterator = null; + LeveldbIterator iterator = null; try { iterator = getDbIterator(false); List entityTypes = new ArrayList(); @@ -1396,6 +1410,8 @@ List getEntityTypes() throws IOException { iterator.seek(lookupKey); } return entityTypes; + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(LOG, iterator); } @@ -1406,7 +1422,7 @@ List getEntityTypes() throws IOException { * the given write batch. */ private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix, - DBIterator iterator) { + LeveldbIterator iterator) { for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) { byte[] key = iterator.peekNext().getKey(); if (!prefixMatches(prefix, prefix.length, key)) { @@ -1418,7 +1434,7 @@ private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix, @VisibleForTesting boolean deleteNextEntity(String entityType, byte[] reverseTimestamp, - DBIterator iterator, DBIterator pfIterator, boolean seeked) + LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked) throws IOException { WriteBatch writeBatch = null; try { @@ -1524,6 +1540,8 @@ boolean deleteNextEntity(String entityType, byte[] reverseTimestamp, writeOptions.sync(true); db.write(writeBatch, writeOptions); return true; + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(LOG, writeBatch); } @@ -1542,8 +1560,8 @@ void discardOldEntities(long timestamp) try { List entityTypes = getEntityTypes(); for (String entityType : entityTypes) { - DBIterator iterator = null; - DBIterator pfIterator = null; + LeveldbIterator iterator = null; + LeveldbIterator pfIterator = null; long typeCount = 0; try { deleteLock.writeLock().lock(); @@ -1583,21 +1601,25 @@ void discardOldEntities(long timestamp) } @VisibleForTesting - DBIterator getDbIterator(boolean fillCache) { + LeveldbIterator getDbIterator(boolean fillCache) { ReadOptions readOptions = new ReadOptions(); readOptions.fillCache(fillCache); - return db.iterator(readOptions); + return new LeveldbIterator(db, readOptions); } Version loadVersion() throws IOException { - byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY)); - // if version is not stored previously, treat it as 1.0. - if (data == null || data.length == 0) { - return Version.newInstance(1, 0); + try { + byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return Version.newInstance(1, 0); + } + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); + return version; + } catch(DBException e) { + throw new IOException(e); } - Version version = - new VersionPBImpl(VersionProto.parseFrom(data)); - return version; } // Only used for test @@ -1726,6 +1748,8 @@ public void put(TimelineDomain domain) throws IOException { writeBatch.put(domainEntryKey, timestamps); writeBatch.put(ownerLookupEntryKey, timestamps); db.write(writeBatch); + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(LOG, writeBatch); } @@ -1754,13 +1778,15 @@ private static byte[] createOwnerLookupKey( @Override public TimelineDomain getDomain(String domainId) throws IOException { - DBIterator iterator = null; + LeveldbIterator iterator = null; try { byte[] prefix = KeyBuilder.newInstance() .add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup(); - iterator = db.iterator(); + iterator = new LeveldbIterator(db); iterator.seek(prefix); return getTimelineDomain(iterator, domainId, prefix); + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(LOG, iterator); } @@ -1769,12 +1795,12 @@ public TimelineDomain getDomain(String domainId) @Override public TimelineDomains getDomains(String owner) throws IOException { - DBIterator iterator = null; + LeveldbIterator iterator = null; try { byte[] prefix = KeyBuilder.newInstance() .add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup(); List domains = new ArrayList(); - for (iterator = db.iterator(), iterator.seek(prefix); + for (iterator = new LeveldbIterator(db), iterator.seek(prefix); iterator.hasNext();) { byte[] key = iterator.peekNext().getKey(); if (!prefixMatches(prefix, prefix.length, key)) { @@ -1809,13 +1835,15 @@ public int compare( TimelineDomains domainsToReturn = new TimelineDomains(); domainsToReturn.addDomains(domains); return domainsToReturn; + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(LOG, iterator); } } private static TimelineDomain getTimelineDomain( - DBIterator iterator, String domainId, byte[] prefix) throws IOException { + LeveldbIterator iterator, String domainId, byte[] prefix) throws IOException { // Iterate over all the rows whose key starts with prefix to retrieve the // domain information. TimelineDomain domain = new TimelineDomain(); @@ -1852,5 +1880,5 @@ private static TimelineDomain getTimelineDomain( } else { return domain; } - } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java index 5ebc96b627..d266aa27af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java @@ -45,7 +45,8 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; import org.apache.hadoop.yarn.server.timeline.NameValuePair; -import org.iq80.leveldb.DBIterator; +import org.apache.hadoop.yarn.server.utils.LeveldbIterator; +import org.iq80.leveldb.DBException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -146,13 +147,15 @@ public void testCacheSizes() { private boolean deleteNextEntity(String entityType, byte[] ts) throws IOException, InterruptedException { - DBIterator iterator = null; - DBIterator pfIterator = null; + LeveldbIterator iterator = null; + LeveldbIterator pfIterator = null; try { iterator = ((LeveldbTimelineStore)store).getDbIterator(false); pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false); return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts, iterator, pfIterator, false); + } catch(DBException e) { + throw new IOException(e); } finally { IOUtils.cleanup(null, iterator, pfIterator); }