YARN-6654. RollingLevelDBTimelineStore backwards incompatible after fst upgrade. Contributed by Jonathan Eagles
This commit is contained in:
parent
945c0958bb
commit
5f1ee72b0e
@ -28,6 +28,7 @@
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
@ -74,6 +75,7 @@
|
|||||||
import org.iq80.leveldb.ReadOptions;
|
import org.iq80.leveldb.ReadOptions;
|
||||||
import org.iq80.leveldb.WriteBatch;
|
import org.iq80.leveldb.WriteBatch;
|
||||||
import org.nustaq.serialization.FSTConfiguration;
|
import org.nustaq.serialization.FSTConfiguration;
|
||||||
|
import org.nustaq.serialization.FSTClazzNameRegistry;
|
||||||
|
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
|
|
||||||
@ -170,9 +172,22 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|||||||
.getLog(RollingLevelDBTimelineStore.class);
|
.getLog(RollingLevelDBTimelineStore.class);
|
||||||
private static FSTConfiguration fstConf =
|
private static FSTConfiguration fstConf =
|
||||||
FSTConfiguration.createDefaultConfiguration();
|
FSTConfiguration.createDefaultConfiguration();
|
||||||
|
// Fall back to 2.24 parsing if 2.50 parsing fails
|
||||||
|
private static FSTConfiguration fstConf224 =
|
||||||
|
FSTConfiguration.createDefaultConfiguration();
|
||||||
|
// Static class code for 2.24
|
||||||
|
private static final int LINKED_HASH_MAP_224_CODE = 83;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
fstConf.setShareReferences(false);
|
fstConf.setShareReferences(false);
|
||||||
|
fstConf224.setShareReferences(false);
|
||||||
|
// YARN-6654 unable to find class for code 83 (LinkedHashMap)
|
||||||
|
// The linked hash map was changed between 2.24 and 2.50 so that
|
||||||
|
// the static code for LinkedHashMap (83) was changed to a dynamic
|
||||||
|
// code.
|
||||||
|
FSTClazzNameRegistry registry = fstConf224.getClassRegistry();
|
||||||
|
registry.registerClass(
|
||||||
|
LinkedHashMap.class, LINKED_HASH_MAP_224_CODE, fstConf224);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@ -339,7 +354,7 @@ protected void serviceStart() throws Exception {
|
|||||||
deletionThread.start();
|
deletionThread.start();
|
||||||
}
|
}
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStop() throws Exception {
|
protected void serviceStop() throws Exception {
|
||||||
@ -365,7 +380,7 @@ private class EntityDeletionThread extends Thread {
|
|||||||
private final long ttl;
|
private final long ttl;
|
||||||
private final long ttlInterval;
|
private final long ttlInterval;
|
||||||
|
|
||||||
public EntityDeletionThread(Configuration conf) {
|
EntityDeletionThread(Configuration conf) {
|
||||||
ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
|
ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
|
||||||
DEFAULT_TIMELINE_SERVICE_TTL_MS);
|
DEFAULT_TIMELINE_SERVICE_TTL_MS);
|
||||||
ttlInterval = conf.getLong(
|
ttlInterval = conf.getLong(
|
||||||
@ -479,9 +494,15 @@ private static TimelineEntity getEntity(String entityId, String entityType,
|
|||||||
try {
|
try {
|
||||||
o = fstConf.asObject(iterator.peekNext().getValue());
|
o = fstConf.asObject(iterator.peekNext().getValue());
|
||||||
entity.addOtherInfo(keyStr, o);
|
entity.addOtherInfo(keyStr, o);
|
||||||
} catch (Exception e) {
|
} catch (Exception ignore) {
|
||||||
LOG.warn("Error while decoding "
|
try {
|
||||||
+ entityId + ":otherInfo:" + keyStr, e);
|
// Fall back to 2.24 parser
|
||||||
|
o = fstConf224.asObject(iterator.peekNext().getValue());
|
||||||
|
entity.addOtherInfo(keyStr, o);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Error while decoding "
|
||||||
|
+ entityId + ":otherInfo:" + keyStr, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
|
} else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
|
||||||
@ -1348,8 +1369,13 @@ private static TimelineEvent getEntityEvent(Set<String> eventTypes,
|
|||||||
Object o = null;
|
Object o = null;
|
||||||
try {
|
try {
|
||||||
o = fstConf.asObject(value);
|
o = fstConf.asObject(value);
|
||||||
} catch (Exception e) {
|
} catch (Exception ignore) {
|
||||||
LOG.warn("Error while decoding " + tstype, e);
|
try {
|
||||||
|
// Fall back to 2.24 parser
|
||||||
|
o = fstConf224.asObject(value);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Error while decoding " + tstype, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (o == null) {
|
if (o == null) {
|
||||||
event.setEventInfo(null);
|
event.setEventInfo(null);
|
||||||
@ -1378,8 +1404,14 @@ private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
|
|||||||
try {
|
try {
|
||||||
value = fstConf.asObject(bytes);
|
value = fstConf.asObject(bytes);
|
||||||
entity.addPrimaryFilter(name, value);
|
entity.addPrimaryFilter(name, value);
|
||||||
} catch (Exception e) {
|
} catch (Exception ignore) {
|
||||||
LOG.warn("Error while decoding " + name, e);
|
try {
|
||||||
|
// Fall back to 2.24 parser
|
||||||
|
value = fstConf224.asObject(bytes);
|
||||||
|
entity.addPrimaryFilter(name, value);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Error while decoding " + name, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user