HDFS-11047. Remove deep copies of FinalizedReplica to alleviate heap consumption on DataNode. Contributed by Xiaobing Zhou

This commit is contained in:
Mingliang Liu 2016-10-27 15:58:09 -07:00
parent f3ac1f41b8
commit 9e03ee5279
3 changed files with 28 additions and 13 deletions

View File

@ -22,6 +22,7 @@
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -398,14 +399,13 @@ private void scan() {
diffs.put(bpid, diffRecord); diffs.put(bpid, diffRecord);
statsRecord.totalBlocks = blockpoolReport.length; statsRecord.totalBlocks = blockpoolReport.length;
List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid); final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
ReplicaInfo[] memReport = bl.toArray(new ReplicaInfo[bl.size()]); Collections.sort(bl); // Sort based on blockId
Arrays.sort(memReport); // Sort based on blockId
int d = 0; // index for blockpoolReport int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot int m = 0; // index for memReprot
while (m < memReport.length && d < blockpoolReport.length) { while (m < bl.size() && d < blockpoolReport.length) {
ReplicaInfo memBlock = memReport[m]; ReplicaInfo memBlock = bl.get(m);
ScanInfo info = blockpoolReport[d]; ScanInfo info = blockpoolReport[d];
if (info.getBlockId() < memBlock.getBlockId()) { if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) { if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
@ -452,8 +452,8 @@ private void scan() {
++m; ++m;
} }
} }
while (m < memReport.length) { while (m < bl.size()) {
ReplicaInfo current = memReport[m++]; ReplicaInfo current = bl.get(m++);
addDifference(diffRecord, statsRecord, addDifference(diffRecord, statsRecord,
current.getBlockId(), current.getVolume()); current.getBlockId(), current.getVolume());
} }

View File

@ -229,7 +229,16 @@ StorageReport[] getStorageReports(String bpid)
*/ */
VolumeFailureSummary getVolumeFailureSummary(); VolumeFailureSummary getVolumeFailureSummary();
/** @return a list of finalized blocks for the given block pool. */ /**
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
* pool.
*/
List<ReplicaInfo> getFinalizedBlocks(String bpid); List<ReplicaInfo> getFinalizedBlocks(String bpid);
/** @return a list of finalized blocks for the given block pool. */ /** @return a list of finalized blocks for the given block pool. */

View File

@ -1714,17 +1714,23 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
} }
/** /**
* Get the list of finalized blocks from in-memory blockmap for a block pool. * Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
* pool.
*/ */
@Override @Override
public List<ReplicaInfo> getFinalizedBlocks(String bpid) { public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
ArrayList<ReplicaInfo> finalized = final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
new ArrayList<ReplicaInfo>(volumeMap.size(bpid)); volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) { for (ReplicaInfo b : volumeMap.replicas(bpid)) {
if (b.getState() == ReplicaState.FINALIZED) { if (b.getState() == ReplicaState.FINALIZED) {
finalized.add(new ReplicaBuilder(ReplicaState.FINALIZED) finalized.add(b);
.from(b).build());
} }
} }
return finalized; return finalized;